DSPatch v.11.3.1
Loading...
Searching...
No Matches
Circuit.h
1/******************************************************************************
2DSPatch - The Refreshingly Simple C++ Dataflow Framework
3Copyright (c) 2024, Marcus Tomlinson
4
5BSD 2-Clause License
6
7Redistribution and use in source and binary forms, with or without
8modification, are permitted provided that the following conditions are met:
9
101. Redistributions of source code must retain the above copyright notice, this
11 list of conditions and the following disclaimer.
12
132. Redistributions in binary form must reproduce the above copyright notice,
14 this list of conditions and the following disclaimer in the documentation
15 and/or other materials provided with the distribution.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27******************************************************************************/
28
29#pragma once
30
31#include "Component.h"
32
33#ifdef _WIN32
34#define WIN32_LEAN_AND_MEAN
35#include <windows.h>
36#undef WIN32_LEAN_AND_MEAN
37#endif
38
39#include <algorithm>
40#include <condition_variable>
41#include <thread>
42#include <unordered_set>
43
44namespace DSPatch
45{
46
48
72class Circuit final
73{
74public:
75 Circuit( const Circuit& ) = delete;
76 Circuit& operator=( const Circuit& ) = delete;
77
78 Circuit();
79 ~Circuit();
80
81 bool AddComponent( const Component::SPtr& component );
82
83 bool RemoveComponent( const Component::SPtr& component );
84 void RemoveAllComponents();
85
86 int GetComponentCount() const;
87
88 bool ConnectOutToIn( const Component::SPtr& fromComponent, int fromOutput, const Component::SPtr& toComponent, int toInput );
89
90 bool DisconnectComponent( const Component::SPtr& component );
91 void DisconnectAllComponents();
92
93 void SetBufferCount( int bufferCount );
94 int GetBufferCount() const;
95
96 void SetThreadCount( int threadCount );
97 int GetThreadCount() const;
98
99 void Tick();
100 void Sync();
101
102 void StartAutoTick();
103 void StopAutoTick();
104 void PauseAutoTick();
105 void ResumeAutoTick();
106
107 void Optimize();
108
109private:
110 class AutoTickThread final
111 {
112 public:
113 AutoTickThread( const AutoTickThread& ) = delete;
114 AutoTickThread& operator=( const AutoTickThread& ) = delete;
115
116 inline AutoTickThread() = default;
117
118 inline ~AutoTickThread()
119 {
120 Stop();
121 }
122
123 inline void Start( DSPatch::Circuit* circuit )
124 {
125 if ( !_stopped )
126 {
127 Resume();
128 return;
129 }
130
131 _circuit = circuit;
132
133 _stop = false;
134 _stopped = false;
135 _pause = false;
136
137 _thread = std::thread( &AutoTickThread::_Run, this );
138 }
139
140 inline void Stop()
141 {
142 _stop = true;
143 _pause = true;
144
145 if ( _thread.joinable() )
146 {
147 _thread.join();
148 }
149 }
150
151 inline void Pause()
152 {
153 if ( !_stopped && ++pauseCount == 1 )
154 {
155 std::unique_lock<std::mutex> lock( _resumeMutex );
156 _pause = true;
157 _pauseCondt.wait( lock ); // wait for pause
158 }
159 }
160
161 inline void Resume()
162 {
163 if ( _pause && --pauseCount == 0 )
164 {
165 _pause = false;
166 _resumeCondt.notify_all();
167 std::this_thread::yield();
168 }
169 }
170
171 private:
172 inline void _Run()
173 {
174 if ( _circuit )
175 {
176 while ( true )
177 {
178 _circuit->Tick();
179
180 if ( _pause )
181 {
182 if ( _stop )
183 {
184 break;
185 }
186
187 std::unique_lock<std::mutex> lock( _resumeMutex );
188
189 _pauseCondt.notify_all();
190 _resumeCondt.wait( lock ); // wait for resume
191 }
192 }
193 }
194
195 _stopped = true;
196 }
197
198 std::thread _thread;
199 DSPatch::Circuit* _circuit = nullptr;
200 int pauseCount = 0;
201 bool _stop = false;
202 bool _pause = false;
203 bool _stopped = true;
204 std::mutex _resumeMutex;
205 std::condition_variable _resumeCondt, _pauseCondt;
206 };
207
208 class CircuitThread final
209 {
210 public:
211 CircuitThread( const CircuitThread& ) = delete;
212 CircuitThread& operator=( const CircuitThread& ) = delete;
213
214 inline CircuitThread() = default;
215
216 // cppcheck-suppress missingMemberCopy
217 inline CircuitThread( CircuitThread&& )
218 {
219 }
220
221 inline ~CircuitThread()
222 {
223 Stop();
224 }
225
226 inline void Start( std::vector<DSPatch::Component*>* components, int bufferNo )
227 {
228 _components = components;
229 _bufferNo = bufferNo;
230
231 _stop = false;
232 _gotSync = false;
233
234 _thread = std::thread( &CircuitThread::_Run, this );
235 }
236
237 inline void Stop()
238 {
239 _stop = true;
240
241 Resume();
242
243 if ( _thread.joinable() )
244 {
245 _thread.join();
246 }
247 }
248
249 inline void Sync()
250 {
251 std::unique_lock<std::mutex> lock( _syncMutex );
252
253 if ( !_gotSync ) // if haven't already got sync
254 {
255 _syncCondt.wait( lock ); // wait for sync
256 }
257 }
258
259 inline void Resume()
260 {
261 _gotSync = false; // reset the sync flag
262 _resumeCondt.notify_all();
263 std::this_thread::yield();
264 }
265
266 inline void SyncAndResume()
267 {
268 Sync();
269 Resume();
270 }
271
272 private:
273 inline void _Run()
274 {
275#ifdef _WIN32
276 SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_HIGHEST );
277#else
278 sched_param sch_params;
279 sch_params.sched_priority = sched_get_priority_max( SCHED_RR );
280 pthread_setschedparam( pthread_self(), SCHED_RR, &sch_params );
281#endif
282
283 if ( _components )
284 {
285 while ( true )
286 {
287 {
288 std::unique_lock<std::mutex> lock( _syncMutex );
289
290 _gotSync = true; // set the sync flag
291 _syncCondt.notify_all();
292 _resumeCondt.wait( lock ); // wait for resume
293 }
294
295 if ( _stop )
296 {
297 break;
298 }
299
300 // You might be thinking: Can't we have each thread start on a different component?
301
302 // Well no. In order to maintain synchronisation within the circuit, when a component
303 // wants to process its buffers in-order, it requires that every other in-order
304 // component in the system has not only processed its buffers in the same order, but
305 // has processed the same number of buffers too.
306
307 // E.g. 1,2,3 and 1,2,3. Not 1,2,3 and 2,3,1,2,3.
308
309 for ( auto component : *_components )
310 {
311 component->Tick( _bufferNo );
312 }
313 }
314 }
315 }
316
317 std::thread _thread;
318 std::vector<DSPatch::Component*>* _components = nullptr;
319 int _bufferNo = 0;
320 bool _stop = false;
321 bool _gotSync = false;
322 std::mutex _syncMutex;
323 std::condition_variable _resumeCondt, _syncCondt;
324 };
325
326 class CircuitThreadParallel final
327 {
328 public:
329 CircuitThreadParallel( const CircuitThreadParallel& ) = delete;
330 CircuitThreadParallel& operator=( const CircuitThreadParallel& ) = delete;
331
332 inline CircuitThreadParallel() = default;
333
334 // cppcheck-suppress missingMemberCopy
335 inline CircuitThreadParallel( CircuitThreadParallel&& )
336 {
337 }
338
339 inline ~CircuitThreadParallel()
340 {
341 Stop();
342 }
343
344 inline void Start( std::vector<DSPatch::Component*>* components, int bufferNo, int threadNo, int threadCount )
345 {
346 _components = components;
347 _bufferNo = bufferNo;
348 _threadNo = threadNo;
349 _threadCount = threadCount;
350
351 _stop = false;
352 _gotSync = false;
353
354 _thread = std::thread( &CircuitThreadParallel::_Run, this );
355 }
356
357 inline void Stop()
358 {
359 _stop = true;
360
361 Resume();
362
363 if ( _thread.joinable() )
364 {
365 _thread.join();
366 }
367 }
368
369 inline void Sync()
370 {
371 std::unique_lock<std::mutex> lock( _syncMutex );
372
373 if ( !_gotSync ) // if haven't already got sync
374 {
375 _syncCondt.wait( lock ); // wait for sync
376 }
377 }
378
379 inline void Resume()
380 {
381 _gotSync = false; // reset the sync flag
382 _resumeCondt.notify_all();
383 std::this_thread::yield();
384 }
385
386 private:
387 inline void _Run()
388 {
389#ifdef _WIN32
390 SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_HIGHEST );
391#else
392 sched_param sch_params;
393 sch_params.sched_priority = sched_get_priority_max( SCHED_RR );
394 pthread_setschedparam( pthread_self(), SCHED_RR, &sch_params );
395#endif
396
397 if ( _components )
398 {
399 while ( true )
400 {
401 {
402 std::unique_lock<std::mutex> lock( _syncMutex );
403
404 _gotSync = true; // set the sync flag
405 _syncCondt.notify_all();
406 _resumeCondt.wait( lock ); // wait for resume
407 }
408
409 if ( _stop )
410 {
411 break;
412 }
413
414 for ( auto it = _components->begin() + _threadNo; it < _components->end(); it += _threadCount )
415 {
416 ( *it )->TickParallel( _bufferNo );
417 }
418 }
419 }
420 }
421
422 std::thread _thread;
423 std::vector<DSPatch::Component*>* _components = nullptr;
424 int _bufferNo = 0;
425 int _threadNo = 0;
426 int _threadCount = 0;
427 bool _stop = false;
428 bool _gotSync = false;
429 std::mutex _syncMutex;
430 std::condition_variable _resumeCondt, _syncCondt;
431 };
432
433 void _Optimize();
434
435 int _bufferCount = 0;
436 int _threadCount = 0;
437 int _currentBuffer = 0;
438
439 AutoTickThread _autoTickThread;
440
441 std::unordered_set<DSPatch::Component::SPtr> _componentsSet;
442
443 std::vector<DSPatch::Component*> _components;
444 std::vector<DSPatch::Component*> _componentsParallel;
445
446 std::vector<CircuitThread> _circuitThreads;
447 std::vector<std::vector<CircuitThreadParallel>> _circuitThreadsParallel;
448
449 bool _circuitDirty = false;
450};
451
452inline Circuit::Circuit() = default;
453
454inline Circuit::~Circuit()
455{
456 StopAutoTick();
457 DisconnectAllComponents();
458}
459
460inline bool Circuit::AddComponent( const Component::SPtr& component )
461{
462 if ( !component || _componentsSet.find( component ) != _componentsSet.end() )
463 {
464 return false;
465 }
466
467 // components within the circuit need to have as many buffers as there are threads in the circuit
468 component->SetBufferCount( _bufferCount, _currentBuffer );
469
470 PauseAutoTick();
471 _components.emplace_back( component.get() );
472 _componentsParallel.emplace_back( component.get() );
473 ResumeAutoTick();
474
475 _componentsSet.emplace( component );
476
477 return true;
478}
479
480inline bool Circuit::RemoveComponent( const Component::SPtr& component )
481{
482 if ( _componentsSet.find( component ) == _componentsSet.end() )
483 {
484 return false;
485 }
486
487 auto findFn = [&component]( auto comp ) { return comp == component.get(); };
488
489 if ( auto it = std::find_if( _components.begin(), _components.end(), findFn ); it != _components.end() )
490 {
491 PauseAutoTick();
492
493 DisconnectComponent( component );
494
495 _components.erase( it );
496
497 ResumeAutoTick();
498
499 _componentsSet.erase( component );
500
501 return true;
502 }
503
504 return false;
505}
506
507// cppcheck-suppress unusedFunction
508inline void Circuit::RemoveAllComponents()
509{
510 PauseAutoTick();
511
512 DisconnectAllComponents();
513
514 _components.clear();
515 _componentsParallel.clear();
516
517 ResumeAutoTick();
518
519 _componentsSet.clear();
520}
521
522inline int Circuit::GetComponentCount() const
523{
524 return (int)_components.size();
525}
526
527inline bool Circuit::ConnectOutToIn( const Component::SPtr& fromComponent,
528 int fromOutput,
529 const Component::SPtr& toComponent,
530 int toInput )
531{
532 if ( _componentsSet.find( fromComponent ) == _componentsSet.end() ||
533 _componentsSet.find( toComponent ) == _componentsSet.end() )
534 {
535 return false;
536 }
537
538 PauseAutoTick();
539
540 bool result = toComponent->ConnectInput( fromComponent, fromOutput, toInput );
541
542 _circuitDirty = result;
543
544 ResumeAutoTick();
545
546 return result;
547}
548
549inline bool Circuit::DisconnectComponent( const Component::SPtr& component )
550{
551 if ( _componentsSet.find( component ) == _componentsSet.end() )
552 {
553 return false;
554 }
555
556 PauseAutoTick();
557
558 component->DisconnectAllInputs();
559
560 // remove any connections this component has to other components
561 for ( auto comp : _components )
562 {
563 comp->DisconnectInput( component );
564 }
565
566 _circuitDirty = true;
567
568 ResumeAutoTick();
569
570 return true;
571}
572
573inline void Circuit::DisconnectAllComponents()
574{
575 PauseAutoTick();
576
577 for ( auto component : _components )
578 {
579 component->DisconnectAllInputs();
580 }
581
582 ResumeAutoTick();
583}
584
585inline void Circuit::SetBufferCount( int bufferCount )
586{
587 PauseAutoTick();
588
589 _bufferCount = bufferCount;
590
591 // stop all threads
592 for ( auto& circuitThread : _circuitThreads )
593 {
594 circuitThread.Stop();
595 }
596
597 // resize thread array
598 if ( _threadCount != 0 )
599 {
600 _circuitThreads.resize( 0 );
601 SetThreadCount( _threadCount );
602 }
603 else
604 {
605 _circuitThreads.resize( _bufferCount );
606
607 // initialise and start all threads
608 for ( int i = 0; i < _bufferCount; ++i )
609 {
610 _circuitThreads[i].Start( &_components, i );
611 }
612 }
613
614 if ( _currentBuffer >= _bufferCount )
615 {
616 _currentBuffer = 0;
617 }
618
619 // set all components to the new buffer count
620 for ( auto component : _components )
621 {
622 component->SetBufferCount( _bufferCount, _currentBuffer );
623 }
624
625 ResumeAutoTick();
626}
627
628inline int Circuit::GetBufferCount() const
629{
630 return _bufferCount;
631}
632
633inline void Circuit::SetThreadCount( int threadCount )
634{
635 PauseAutoTick();
636
637 if ( _threadCount == 0 && threadCount != 0 )
638 {
639 _circuitDirty = true;
640 }
641
642 _threadCount = threadCount;
643
644 // stop all threads
645 for ( auto& circuitThreads : _circuitThreadsParallel )
646 {
647 for ( auto& circuitThread : circuitThreads )
648 {
649 circuitThread.Stop();
650 }
651 }
652
653 // resize thread array
654 if ( _threadCount == 0 )
655 {
656 _circuitThreadsParallel.resize( 0 );
657 SetBufferCount( _bufferCount );
658 }
659 else
660 {
661 _circuitThreadsParallel.resize( _bufferCount == 0 ? 1 : _bufferCount );
662 for ( auto& circuitThread : _circuitThreadsParallel )
663 {
664 circuitThread.resize( _threadCount );
665 }
666
667 // initialise and start all threads
668 int i = 0;
669 for ( auto& circuitThreads : _circuitThreadsParallel )
670 {
671 int j = 0;
672 for ( auto& circuitThread : circuitThreads )
673 {
674 circuitThread.Start( &_componentsParallel, i, j++, _threadCount );
675 }
676 ++i;
677 }
678 }
679
680 ResumeAutoTick();
681}
682
683// cppcheck-suppress unusedFunction
684inline int Circuit::GetThreadCount() const
685{
686 return _threadCount;
687}
688
689inline void Circuit::Tick()
690{
691 if ( _circuitDirty )
692 {
693 _Optimize();
694 }
695
696 // process in multiple threads if this circuit has threads
697 // =======================================================
698 if ( _threadCount != 0 )
699 {
700 auto& circuitThreads = _circuitThreadsParallel[_currentBuffer];
701
702 for ( auto& circuitThread : circuitThreads )
703 {
704 circuitThread.Sync();
705 }
706 for ( auto& circuitThread : circuitThreads )
707 {
708 circuitThread.Resume();
709 }
710 }
711 // process in a single thread if this circuit has no threads
712 // =========================================================
713 else if ( _bufferCount == 0 )
714 {
715 // tick all internal components
716 for ( auto component : _components )
717 {
718 component->Tick( 0 );
719 }
720
721 return;
722 }
723 else
724 {
725 _circuitThreads[_currentBuffer].SyncAndResume(); // sync and resume thread x
726 }
727
728 if ( _bufferCount != 0 && ++_currentBuffer == _bufferCount )
729 {
730 _currentBuffer = 0;
731 }
732}
733
734inline void Circuit::Sync()
735{
736 // sync all threads
737 for ( auto& circuitThread : _circuitThreads )
738 {
739 circuitThread.Sync();
740 }
741 for ( auto& circuitThreads : _circuitThreadsParallel )
742 {
743 for ( auto& circuitThread : circuitThreads )
744 {
745 circuitThread.Sync();
746 }
747 }
748}
749
750inline void Circuit::StartAutoTick()
751{
752 _autoTickThread.Start( this );
753}
754
755inline void Circuit::StopAutoTick()
756{
757 _autoTickThread.Stop();
758 Sync();
759}
760
761inline void Circuit::PauseAutoTick()
762{
763 _autoTickThread.Pause();
764 Sync();
765}
766
767inline void Circuit::ResumeAutoTick()
768{
769 _autoTickThread.Resume();
770}
771
772inline void Circuit::Optimize()
773{
774 if ( _circuitDirty )
775 {
776 PauseAutoTick();
777 _Optimize();
778 ResumeAutoTick();
779 }
780}
781
782inline void Circuit::_Optimize()
783{
784 // scan for optimal series order -> update _components
785 {
786 std::vector<DSPatch::Component*> orderedComponents;
787 orderedComponents.reserve( _components.size() );
788
789 for ( auto component : _components )
790 {
791 component->Scan( orderedComponents );
792 }
793 for ( auto component : _components )
794 {
795 component->EndScan();
796 }
797
798 _components = std::move( orderedComponents );
799 }
800
801 // scan for optimal parallel order -> update _componentsParallel
802 if ( _threadCount != 0 )
803 {
804 std::vector<std::vector<DSPatch::Component*>> componentsMap;
805 componentsMap.reserve( _components.size() );
806
807 int scanPosition;
808 for ( auto component : _components )
809 {
810 component->ScanParallel( componentsMap, scanPosition );
811 }
812 for ( auto component : _components )
813 {
814 component->EndScan();
815 }
816
817 _componentsParallel.clear();
818 _componentsParallel.reserve( _components.size() );
819 for ( auto& componentsMapEntry : componentsMap )
820 {
821 _componentsParallel.insert( _componentsParallel.end(), componentsMapEntry.begin(), componentsMapEntry.end() );
822 }
823 }
824
825 // clear _circuitDirty flag
826 _circuitDirty = false;
827}
828
829} // namespace DSPatch
Workspace for adding and routing components.
Definition Circuit.h:73