DSPatch v.11.2.0
Loading...
Searching...
No Matches
Circuit.h
1/******************************************************************************
2DSPatch - The Refreshingly Simple C++ Dataflow Framework
3Copyright (c) 2024, Marcus Tomlinson
4
5BSD 2-Clause License
6
7Redistribution and use in source and binary forms, with or without
8modification, are permitted provided that the following conditions are met:
9
101. Redistributions of source code must retain the above copyright notice, this
11 list of conditions and the following disclaimer.
12
132. Redistributions in binary form must reproduce the above copyright notice,
14 this list of conditions and the following disclaimer in the documentation
15 and/or other materials provided with the distribution.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27******************************************************************************/
28
29#pragma once
30
31#include "Component.h"
32
33#ifdef _WIN32
34#define WIN32_LEAN_AND_MEAN
35#include <windows.h>
36#undef WIN32_LEAN_AND_MEAN
37#endif
38
39#include <algorithm>
40#include <condition_variable>
41#include <set>
42#include <thread>
43
44namespace DSPatch
45{
46
48
72class Circuit final
73{
74public:
75 Circuit( const Circuit& ) = delete;
76 Circuit& operator=( const Circuit& ) = delete;
77
78 Circuit();
79 ~Circuit();
80
81 bool AddComponent( const Component::SPtr& component );
82
83 bool RemoveComponent( const Component::SPtr& component );
84 void RemoveAllComponents();
85
86 int GetComponentCount() const;
87
88 bool ConnectOutToIn( const Component::SPtr& fromComponent, int fromOutput, const Component::SPtr& toComponent, int toInput );
89
90 bool DisconnectComponent( const Component::SPtr& component );
91 void DisconnectAllComponents();
92
93 void SetBufferCount( int bufferCount );
94 int GetBufferCount() const;
95
96 void SetThreadCount( int threadCount );
97 int GetThreadCount() const;
98
99 void Tick();
100 void Sync();
101
102 void StartAutoTick();
103 void StopAutoTick();
104 void PauseAutoTick();
105 void ResumeAutoTick();
106
107 void Optimize();
108
109private:
110 class AutoTickThread final
111 {
112 public:
113 AutoTickThread( const AutoTickThread& ) = delete;
114 AutoTickThread& operator=( const AutoTickThread& ) = delete;
115
116 inline AutoTickThread() = default;
117
118 inline ~AutoTickThread()
119 {
120 Stop();
121 }
122
123 inline void Start( DSPatch::Circuit* circuit )
124 {
125 if ( !_stopped )
126 {
127 Resume();
128 return;
129 }
130
131 _circuit = circuit;
132
133 _stop = false;
134 _stopped = false;
135 _pause = false;
136
137 _thread = std::thread( &AutoTickThread::_Run, this );
138 }
139
140 inline void Stop()
141 {
142 _stop = true;
143
144 if ( _thread.joinable() )
145 {
146 _thread.join();
147 }
148 }
149
150 inline void Pause()
151 {
152 if ( !_stopped && ++pauseCount == 1 )
153 {
154 std::unique_lock<std::mutex> lock( _resumeMutex );
155 _pause = true;
156 _pauseCondt.wait( lock ); // wait for pause
157 }
158 }
159
160 inline void Resume()
161 {
162 if ( _pause && --pauseCount == 0 )
163 {
164 _pause = false;
165 _resumeCondt.notify_all();
166 std::this_thread::yield();
167 }
168 }
169
170 private:
171 inline void _Run()
172 {
173 if ( _circuit )
174 {
175 while ( !_stop )
176 {
177 _circuit->Tick();
178
179 if ( _pause )
180 {
181 std::unique_lock<std::mutex> lock( _resumeMutex );
182
183 _pauseCondt.notify_all();
184 _resumeCondt.wait( lock ); // wait for resume
185 }
186 }
187 }
188
189 _stopped = true;
190 }
191
192 std::thread _thread;
193 DSPatch::Circuit* _circuit = nullptr;
194 int pauseCount = 0;
195 bool _stop = false;
196 bool _pause = false;
197 bool _stopped = true;
198 std::mutex _resumeMutex;
199 std::condition_variable _resumeCondt, _pauseCondt;
200 };
201
202 class CircuitThread final
203 {
204 public:
205 CircuitThread( const CircuitThread& ) = delete;
206 CircuitThread& operator=( const CircuitThread& ) = delete;
207
208 inline CircuitThread() = default;
209
210 // cppcheck-suppress missingMemberCopy
211 inline CircuitThread( CircuitThread&& )
212 {
213 }
214
215 inline ~CircuitThread()
216 {
217 Stop();
218 }
219
220 inline void Start( std::vector<DSPatch::Component*>* components, int bufferNo )
221 {
222 _components = components;
223 _bufferNo = bufferNo;
224
225 _stop = false;
226 _gotSync = false;
227
228 _thread = std::thread( &CircuitThread::_Run, this );
229 }
230
231 inline void Stop()
232 {
233 _stop = true;
234
235 Resume();
236
237 if ( _thread.joinable() )
238 {
239 _thread.join();
240 }
241 }
242
243 inline void Sync()
244 {
245 std::unique_lock<std::mutex> lock( _syncMutex );
246
247 if ( !_gotSync ) // if haven't already got sync
248 {
249 _syncCondt.wait( lock ); // wait for sync
250 }
251 }
252
253 inline void Resume()
254 {
255 _gotSync = false; // reset the sync flag
256 _resumeCondt.notify_all();
257 std::this_thread::yield();
258 }
259
260 inline void SyncAndResume()
261 {
262 Sync();
263 Resume();
264 }
265
266 private:
267 inline void _Run()
268 {
269#ifdef _WIN32
270 SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_HIGHEST );
271#else
272 sched_param sch_params;
273 sch_params.sched_priority = sched_get_priority_max( SCHED_RR );
274 pthread_setschedparam( pthread_self(), SCHED_RR, &sch_params );
275#endif
276
277 if ( _components )
278 {
279 while ( !_stop )
280 {
281 {
282 std::unique_lock<std::mutex> lock( _syncMutex );
283
284 _gotSync = true; // set the sync flag
285 _syncCondt.notify_all();
286 _resumeCondt.wait( lock ); // wait for resume
287 }
288
289 // cppcheck-suppress knownConditionTrueFalse
290 if ( !_stop )
291 {
292 // You might be thinking: Can't we have each thread start on a different component?
293
294 // Well no. In order to maintain synchronisation within the circuit, when a component
295 // wants to process its buffers in-order, it requires that every other in-order
296 // component in the system has not only processed its buffers in the same order, but
297 // has processed the same number of buffers too.
298
299 // E.g. 1,2,3 and 1,2,3. Not 1,2,3 and 2,3,1,2,3.
300
301 for ( auto component : *_components )
302 {
303 component->Tick( _bufferNo );
304 }
305 }
306 }
307 }
308 }
309
310 std::thread _thread;
311 std::vector<DSPatch::Component*>* _components = nullptr;
312 int _bufferNo = 0;
313 bool _stop = false;
314 bool _gotSync = false;
315 std::mutex _syncMutex;
316 std::condition_variable _resumeCondt, _syncCondt;
317 };
318
319 class CircuitThreadParallel final
320 {
321 public:
322 CircuitThreadParallel( const CircuitThreadParallel& ) = delete;
323 CircuitThreadParallel& operator=( const CircuitThreadParallel& ) = delete;
324
325 inline CircuitThreadParallel() = default;
326
327 // cppcheck-suppress missingMemberCopy
328 inline CircuitThreadParallel( CircuitThreadParallel&& )
329 {
330 }
331
332 inline ~CircuitThreadParallel()
333 {
334 Stop();
335 }
336
337 inline void Start( std::vector<DSPatch::Component*>* components, int bufferNo, int threadNo, int threadCount )
338 {
339 _components = components;
340 _bufferNo = bufferNo;
341 _threadNo = threadNo;
342 _threadCount = threadCount;
343
344 _stop = false;
345 _gotSync = false;
346
347 _thread = std::thread( &CircuitThreadParallel::_Run, this );
348 }
349
350 inline void Stop()
351 {
352 _stop = true;
353
354 Resume();
355
356 if ( _thread.joinable() )
357 {
358 _thread.join();
359 }
360 }
361
362 inline void Sync()
363 {
364 std::unique_lock<std::mutex> lock( _syncMutex );
365
366 if ( !_gotSync ) // if haven't already got sync
367 {
368 _syncCondt.wait( lock ); // wait for sync
369 }
370 }
371
372 inline void Resume()
373 {
374 _gotSync = false; // reset the sync flag
375 _resumeCondt.notify_all();
376 std::this_thread::yield();
377 }
378
379 private:
380 inline void _Run()
381 {
382#ifdef _WIN32
383 SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_HIGHEST );
384#else
385 sched_param sch_params;
386 sch_params.sched_priority = sched_get_priority_max( SCHED_RR );
387 pthread_setschedparam( pthread_self(), SCHED_RR, &sch_params );
388#endif
389
390 if ( _components )
391 {
392 while ( !_stop )
393 {
394 {
395 std::unique_lock<std::mutex> lock( _syncMutex );
396
397 _gotSync = true; // set the sync flag
398 _syncCondt.notify_all();
399 _resumeCondt.wait( lock ); // wait for resume
400 }
401
402 // cppcheck-suppress knownConditionTrueFalse
403 if ( !_stop )
404 {
405 for ( auto it = _components->begin() + _threadNo; it < _components->end(); it += _threadCount )
406 {
407 ( *it )->TickParallel( _bufferNo );
408 }
409 }
410 }
411 }
412 }
413
414 std::thread _thread;
415 std::vector<DSPatch::Component*>* _components = nullptr;
416 int _bufferNo = 0;
417 int _threadNo = 0;
418 int _threadCount = 0;
419 bool _stop = false;
420 bool _gotSync = false;
421 std::mutex _syncMutex;
422 std::condition_variable _resumeCondt, _syncCondt;
423 };
424
425 void _Optimize();
426
427 int _bufferCount = 0;
428 int _threadCount = 0;
429 int _currentBuffer = 0;
430
431 AutoTickThread _autoTickThread;
432
433 std::set<DSPatch::Component::SPtr> _componentsSet;
434
435 std::vector<DSPatch::Component*> _components;
436 std::vector<DSPatch::Component*> _componentsParallel;
437
438 std::vector<CircuitThread> _circuitThreads;
439 std::vector<std::vector<CircuitThreadParallel>> _circuitThreadsParallel;
440
441 bool _circuitDirty = false;
442};
443
444inline Circuit::Circuit() = default;
445
446inline Circuit::~Circuit()
447{
448 StopAutoTick();
449 DisconnectAllComponents();
450}
451
452inline bool Circuit::AddComponent( const Component::SPtr& component )
453{
454 if ( !component || _componentsSet.find( component ) != _componentsSet.end() )
455 {
456 return false;
457 }
458
459 // components within the circuit need to have as many buffers as there are threads in the circuit
460 component->SetBufferCount( _bufferCount, _currentBuffer );
461
462 PauseAutoTick();
463 _components.emplace_back( component.get() );
464 _componentsParallel.emplace_back( component.get() );
465 ResumeAutoTick();
466
467 _componentsSet.emplace( component );
468
469 return true;
470}
471
472inline bool Circuit::RemoveComponent( const Component::SPtr& component )
473{
474 if ( _componentsSet.find( component ) == _componentsSet.end() )
475 {
476 return false;
477 }
478
479 auto findFn = [&component]( auto comp ) { return comp == component.get(); };
480
481 if ( auto it = std::find_if( _components.begin(), _components.end(), findFn ); it != _components.end() )
482 {
483 PauseAutoTick();
484
485 DisconnectComponent( component );
486
487 _components.erase( it );
488
489 ResumeAutoTick();
490
491 _componentsSet.erase( component );
492
493 return true;
494 }
495
496 return false;
497}
498
499// cppcheck-suppress unusedFunction
500inline void Circuit::RemoveAllComponents()
501{
502 PauseAutoTick();
503
504 DisconnectAllComponents();
505
506 _components.clear();
507 _componentsParallel.clear();
508
509 ResumeAutoTick();
510
511 _componentsSet.clear();
512}
513
514inline int Circuit::GetComponentCount() const
515{
516 return (int)_components.size();
517}
518
519inline bool Circuit::ConnectOutToIn( const Component::SPtr& fromComponent,
520 int fromOutput,
521 const Component::SPtr& toComponent,
522 int toInput )
523{
524 if ( _componentsSet.find( fromComponent ) == _componentsSet.end() ||
525 _componentsSet.find( toComponent ) == _componentsSet.end() )
526 {
527 return false;
528 }
529
530 PauseAutoTick();
531
532 bool result = toComponent->ConnectInput( fromComponent, fromOutput, toInput );
533
534 _circuitDirty = result;
535
536 ResumeAutoTick();
537
538 return result;
539}
540
541inline bool Circuit::DisconnectComponent( const Component::SPtr& component )
542{
543 if ( _componentsSet.find( component ) == _componentsSet.end() )
544 {
545 return false;
546 }
547
548 PauseAutoTick();
549
550 component->DisconnectAllInputs();
551
552 // remove any connections this component has to other components
553 for ( auto comp : _components )
554 {
555 comp->DisconnectInput( component );
556 }
557
558 _circuitDirty = true;
559
560 ResumeAutoTick();
561
562 return true;
563}
564
565inline void Circuit::DisconnectAllComponents()
566{
567 PauseAutoTick();
568
569 for ( auto component : _components )
570 {
571 component->DisconnectAllInputs();
572 }
573
574 ResumeAutoTick();
575}
576
577inline void Circuit::SetBufferCount( int bufferCount )
578{
579 PauseAutoTick();
580
581 _bufferCount = bufferCount;
582
583 // stop all threads
584 for ( auto& circuitThread : _circuitThreads )
585 {
586 circuitThread.Stop();
587 }
588
589 // resize thread array
590 if ( _threadCount != 0 )
591 {
592 _circuitThreads.resize( 0 );
593 SetThreadCount( _threadCount );
594 }
595 else
596 {
597 _circuitThreads.resize( _bufferCount );
598
599 // initialise and start all threads
600 for ( int i = 0; i < _bufferCount; ++i )
601 {
602 _circuitThreads[i].Start( &_components, i );
603 }
604 }
605
606 if ( _currentBuffer >= _bufferCount )
607 {
608 _currentBuffer = 0;
609 }
610
611 // set all components to the new buffer count
612 for ( auto component : _components )
613 {
614 component->SetBufferCount( _bufferCount, _currentBuffer );
615 }
616
617 ResumeAutoTick();
618}
619
620inline int Circuit::GetBufferCount() const
621{
622 return _bufferCount;
623}
624
625inline void Circuit::SetThreadCount( int threadCount )
626{
627 PauseAutoTick();
628
629 if ( _threadCount == 0 && threadCount != 0 )
630 {
631 _circuitDirty = true;
632 }
633
634 _threadCount = threadCount;
635
636 // stop all threads
637 for ( auto& circuitThreads : _circuitThreadsParallel )
638 {
639 for ( auto& circuitThread : circuitThreads )
640 {
641 circuitThread.Stop();
642 }
643 }
644
645 // resize thread array
646 if ( _threadCount == 0 )
647 {
648 _circuitThreadsParallel.resize( 0 );
649 SetBufferCount( _bufferCount );
650 }
651 else
652 {
653 _circuitThreadsParallel.resize( _bufferCount == 0 ? 1 : _bufferCount );
654 for ( auto& circuitThread : _circuitThreadsParallel )
655 {
656 circuitThread.resize( _threadCount );
657 }
658
659 // initialise and start all threads
660 int i = 0;
661 for ( auto& circuitThreads : _circuitThreadsParallel )
662 {
663 int j = 0;
664 for ( auto& circuitThread : circuitThreads )
665 {
666 circuitThread.Start( &_componentsParallel, i, j++, _threadCount );
667 }
668 ++i;
669 }
670 }
671
672 ResumeAutoTick();
673}
674
675// cppcheck-suppress unusedFunction
676inline int Circuit::GetThreadCount() const
677{
678 return _threadCount;
679}
680
681inline void Circuit::Tick()
682{
683 if ( _circuitDirty )
684 {
685 _Optimize();
686 }
687
688 // process in a single thread if this circuit has no threads
689 // =========================================================
690 if ( _bufferCount == 0 && _threadCount == 0 )
691 {
692 // tick all internal components
693 for ( auto component : _components )
694 {
695 component->Tick( 0 );
696 }
697
698 return;
699 }
700 // process in multiple threads if this circuit has threads
701 // =======================================================
702 else if ( _threadCount != 0 )
703 {
704 auto& circuitThreads = _circuitThreadsParallel[_currentBuffer];
705
706 for ( auto& circuitThread : circuitThreads )
707 {
708 circuitThread.Sync();
709 }
710 for ( auto& circuitThread : circuitThreads )
711 {
712 circuitThread.Resume();
713 }
714 }
715 else
716 {
717 _circuitThreads[_currentBuffer].SyncAndResume(); // sync and resume thread x
718 }
719
720 if ( _bufferCount != 0 && ++_currentBuffer == _bufferCount )
721 {
722 _currentBuffer = 0;
723 }
724}
725
726inline void Circuit::Sync()
727{
728 // sync all threads
729 for ( auto& circuitThread : _circuitThreads )
730 {
731 circuitThread.Sync();
732 }
733 for ( auto& circuitThreads : _circuitThreadsParallel )
734 {
735 for ( auto& circuitThread : circuitThreads )
736 {
737 circuitThread.Sync();
738 }
739 }
740}
741
742inline void Circuit::StartAutoTick()
743{
744 _autoTickThread.Start( this );
745}
746
747inline void Circuit::StopAutoTick()
748{
749 _autoTickThread.Stop();
750 Sync();
751}
752
753inline void Circuit::PauseAutoTick()
754{
755 _autoTickThread.Pause();
756 Sync();
757}
758
759inline void Circuit::ResumeAutoTick()
760{
761 _autoTickThread.Resume();
762}
763
764inline void Circuit::Optimize()
765{
766 if ( _circuitDirty )
767 {
768 PauseAutoTick();
769 _Optimize();
770 ResumeAutoTick();
771 }
772}
773
774inline void Circuit::_Optimize()
775{
776 // scan for optimal series order -> update _components
777 {
778 std::vector<DSPatch::Component*> orderedComponents;
779 orderedComponents.reserve( _components.size() );
780
781 for ( auto component : _components )
782 {
783 component->Scan( orderedComponents );
784 }
785 for ( auto component : _components )
786 {
787 component->EndScan();
788 }
789
790 _components = std::move( orderedComponents );
791 }
792
793 // scan for optimal parallel order -> update _componentsParallel
794 if ( _threadCount != 0 )
795 {
796 std::vector<std::vector<DSPatch::Component*>> componentsMap;
797 componentsMap.reserve( _components.size() );
798
799 int scanPosition;
800 for ( int i = (int)_components.size() - 1; i >= 0; --i )
801 {
802 _components[i]->ScanParallel( componentsMap, scanPosition );
803 }
804 for ( auto component : _components )
805 {
806 component->EndScan();
807 }
808
809 _componentsParallel.clear();
810 _componentsParallel.reserve( _components.size() );
811 for ( auto& componentsMapEntry : componentsMap )
812 {
813 _componentsParallel.insert( _componentsParallel.end(), componentsMapEntry.begin(), componentsMapEntry.end() );
814 }
815 }
816
817 // clear _circuitDirty flag
818 _circuitDirty = false;
819}
820
821} // namespace DSPatch
Workspace for adding and routing components.
Definition Circuit.h:73