DSPatch v.11.4.3
Loading...
Searching...
No Matches
Circuit.h
1/******************************************************************************
2DSPatch - The Refreshingly Simple C++ Dataflow Framework
3Copyright (c) 2025, Marcus Tomlinson
4
5BSD 2-Clause License
6
7Redistribution and use in source and binary forms, with or without
8modification, are permitted provided that the following conditions are met:
9
101. Redistributions of source code must retain the above copyright notice, this
11 list of conditions and the following disclaimer.
12
132. Redistributions in binary form must reproduce the above copyright notice,
14 this list of conditions and the following disclaimer in the documentation
15 and/or other materials provided with the distribution.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27******************************************************************************/
28
29#pragma once
30
31#include "Component.h"
32
33#ifdef _WIN32
34#define WIN32_LEAN_AND_MEAN
35#include <windows.h>
36#undef WIN32_LEAN_AND_MEAN
37#endif
38
39#include <algorithm>
40#include <condition_variable>
41#include <thread>
42#include <unordered_set>
43
44namespace DSPatch
45{
46
48
72class Circuit final
73{
74public:
75 Circuit( const Circuit& ) = delete;
76 Circuit& operator=( const Circuit& ) = delete;
77
78 Circuit();
79 ~Circuit();
80
81 bool AddComponent( const Component::SPtr& component );
82
83 bool RemoveComponent( const Component::SPtr& component );
84 void RemoveAllComponents();
85
86 int GetComponentCount() const;
87
88 bool ConnectOutToIn( const Component::SPtr& fromComponent, int fromOutput, const Component::SPtr& toComponent, int toInput );
89
90 bool DisconnectComponent( const Component::SPtr& component );
91 void DisconnectAllComponents();
92
93 void SetBufferCount( int bufferCount );
94 int GetBufferCount() const;
95
96 void SetThreadCount( int threadCount );
97 int GetThreadCount() const;
98
99 void Tick();
100 void Sync();
101
102 void StartAutoTick();
103 void StopAutoTick();
104 void PauseAutoTick();
105 void ResumeAutoTick();
106
107 void Optimize();
108
109private:
110 class AutoTickThread final
111 {
112 public:
113 AutoTickThread( const AutoTickThread& ) = delete;
114 AutoTickThread& operator=( const AutoTickThread& ) = delete;
115
116 inline AutoTickThread() = default;
117
118 inline ~AutoTickThread()
119 {
120 Stop();
121 }
122
123 inline void Start( DSPatch::Circuit* circuit )
124 {
125 if ( !_stopped )
126 {
127 Resume();
128 return;
129 }
130
131 _circuit = circuit;
132
133 _stop = false;
134 _stopped = false;
135 _pause = false;
136
137 _thread = std::thread( &AutoTickThread::_Run, this );
138 }
139
140 inline void Stop()
141 {
142 _stop = true;
143 _pause = true;
144
145 if ( _thread.joinable() )
146 {
147 _thread.join();
148 }
149 }
150
151 inline void Pause()
152 {
153 if ( !_stopped && ++pauseCount == 1 )
154 {
155 std::unique_lock<std::mutex> lock( _resumeMutex );
156 _pause = true;
157 _pauseCondt.wait( lock ); // wait for pause
158 }
159 }
160
161 inline void Resume()
162 {
163 if ( _pause && --pauseCount == 0 )
164 {
165 _pause = false;
166 _resumeCondt.notify_all();
167 std::this_thread::yield();
168 }
169 }
170
171 private:
172 inline void _Run()
173 {
174 if ( _circuit )
175 {
176 while ( true )
177 {
178 _circuit->Tick();
179
180 if ( _pause )
181 {
182 if ( _stop )
183 {
184 break;
185 }
186
187 std::unique_lock<std::mutex> lock( _resumeMutex );
188
189 _pauseCondt.notify_all();
190 _resumeCondt.wait( lock ); // wait for resume
191 }
192 }
193 }
194
195 _stopped = true;
196 }
197
198 std::thread _thread;
199 DSPatch::Circuit* _circuit = nullptr;
200 int pauseCount = 0;
201 bool _stop = false;
202 bool _pause = false;
203 bool _stopped = true;
204 std::mutex _resumeMutex;
205 std::condition_variable _resumeCondt, _pauseCondt;
206 };
207
208 class CircuitThread final
209 {
210 public:
211 CircuitThread( const CircuitThread& ) = delete;
212 CircuitThread& operator=( const CircuitThread& ) = delete;
213
214 inline CircuitThread() = default;
215
216 // cppcheck-suppress missingMemberCopy
217 inline CircuitThread( CircuitThread&& )
218 {
219 }
220
221 inline ~CircuitThread()
222 {
223 Stop();
224 }
225
226 inline void Start( std::vector<DSPatch::Component*>* components, int bufferNo, int bufferCount )
227 {
228 _components = components;
229 _bufferNo = bufferNo;
230 _loneBuffer = bufferCount <= 1;
231
232 _stop = false;
233 _gotSync = false;
234
235 _thread = std::thread( &CircuitThread::_Run, this );
236 }
237
238 inline void Stop()
239 {
240 _stop = true;
241
242 Resume();
243
244 if ( _thread.joinable() )
245 {
246 _thread.join();
247 }
248 }
249
250 inline void Sync()
251 {
252 std::unique_lock<std::mutex> lock( _syncMutex );
253
254 if ( !_gotSync ) // if haven't already got sync
255 {
256 _syncCondt.wait( lock ); // wait for sync
257 }
258 }
259
260 inline void Resume()
261 {
262 _gotSync = false; // reset the sync flag
263 _resumeCondt.notify_all();
264 std::this_thread::yield();
265 }
266
267 inline void SyncAndResume()
268 {
269 Sync();
270 Resume();
271 }
272
273 private:
274 inline void _Run()
275 {
276#ifdef _WIN32
277 SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_HIGHEST );
278#else
279 sched_param sch_params;
280 sch_params.sched_priority = sched_get_priority_max( SCHED_RR );
281 pthread_setschedparam( pthread_self(), SCHED_RR, &sch_params );
282#endif
283
284 if ( _components )
285 {
286 while ( true )
287 {
288 {
289 std::unique_lock<std::mutex> lock( _syncMutex );
290
291 _gotSync = true; // set the sync flag
292 _syncCondt.notify_all();
293 _resumeCondt.wait( lock ); // wait for resume
294 }
295
296 if ( _stop )
297 {
298 break;
299 }
300
301 // You might be thinking: Can't we have each thread start on a different component?
302
303 // Well no. In order to maintain synchronisation within the circuit, when a component
304 // wants to process its buffers in-order, it requires that every other in-order
305 // component in the system has not only processed its buffers in the same order, but
306 // has processed the same number of buffers too.
307
308 // E.g. 1,2,3 and 1,2,3. Not 1,2,3 and 2,3,1,2,3.
309
310 if ( _loneBuffer )
311 {
312 for ( auto component : *_components )
313 {
314 component->Tick();
315 }
316 }
317 else
318 {
319 for ( auto component : *_components )
320 {
321 component->Tick( _bufferNo );
322 }
323 }
324 }
325 }
326 }
327
328 std::thread _thread;
329 std::vector<DSPatch::Component*>* _components = nullptr;
330 int _bufferNo = 0;
331 bool _loneBuffer = false;
332 bool _stop = false;
333 bool _gotSync = false;
334 std::mutex _syncMutex;
335 std::condition_variable _resumeCondt, _syncCondt;
336 };
337
338 class CircuitThreadParallel final
339 {
340 public:
341 CircuitThreadParallel( const CircuitThreadParallel& ) = delete;
342 CircuitThreadParallel& operator=( const CircuitThreadParallel& ) = delete;
343
344 inline CircuitThreadParallel() = default;
345
346 // cppcheck-suppress missingMemberCopy
347 inline CircuitThreadParallel( CircuitThreadParallel&& )
348 {
349 }
350
351 inline ~CircuitThreadParallel()
352 {
353 Stop();
354 }
355
356 inline void Start(
357 std::vector<DSPatch::Component*>* components, int bufferNo, int bufferCount, int threadNo, int threadCount )
358 {
359 _components = components;
360 _bufferNo = bufferNo;
361 _loneBuffer = bufferCount <= 1;
362 _threadNo = threadNo;
363 _threadCount = threadCount;
364
365 _stop = false;
366 _gotSync = false;
367
368 _thread = std::thread( &CircuitThreadParallel::_Run, this );
369 }
370
371 inline void Stop()
372 {
373 _stop = true;
374
375 Resume();
376
377 if ( _thread.joinable() )
378 {
379 _thread.join();
380 }
381 }
382
383 inline void Sync()
384 {
385 std::unique_lock<std::mutex> lock( _syncMutex );
386
387 if ( !_gotSync ) // if haven't already got sync
388 {
389 _syncCondt.wait( lock ); // wait for sync
390 }
391 }
392
393 inline void Resume()
394 {
395 _gotSync = false; // reset the sync flag
396 _resumeCondt.notify_all();
397 std::this_thread::yield();
398 }
399
400 private:
401 inline void _Run()
402 {
403#ifdef _WIN32
404 SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_HIGHEST );
405#else
406 sched_param sch_params;
407 sch_params.sched_priority = sched_get_priority_max( SCHED_RR );
408 pthread_setschedparam( pthread_self(), SCHED_RR, &sch_params );
409#endif
410
411 if ( _components )
412 {
413 while ( true )
414 {
415 {
416 std::unique_lock<std::mutex> lock( _syncMutex );
417
418 _gotSync = true; // set the sync flag
419 _syncCondt.notify_all();
420 _resumeCondt.wait( lock ); // wait for resume
421 }
422
423 if ( _stop )
424 {
425 break;
426 }
427
428 if ( _loneBuffer )
429 {
430 for ( auto it = _components->begin() + _threadNo; it < _components->end(); it += _threadCount )
431 {
432 ( *it )->TickParallel();
433 }
434 }
435 else
436 {
437 for ( auto it = _components->begin() + _threadNo; it < _components->end(); it += _threadCount )
438 {
439 ( *it )->TickParallel( _bufferNo );
440 }
441 }
442 }
443 }
444 }
445
446 std::thread _thread;
447 std::vector<DSPatch::Component*>* _components = nullptr;
448 int _bufferNo = 0;
449 bool _loneBuffer = false;
450 int _threadNo = 0;
451 int _threadCount = 0;
452 bool _stop = false;
453 bool _gotSync = false;
454 std::mutex _syncMutex;
455 std::condition_variable _resumeCondt, _syncCondt;
456 };
457
458 void _Optimize();
459
460 int _bufferCount = 0;
461 int _threadCount = 0;
462 int _currentBuffer = 0;
463
464 AutoTickThread _autoTickThread;
465
466 std::unordered_set<DSPatch::Component::SPtr> _componentsSet;
467
468 std::vector<DSPatch::Component*> _components;
469 std::vector<DSPatch::Component*> _componentsParallel;
470
471 std::vector<CircuitThread> _circuitThreads;
472 std::vector<std::vector<CircuitThreadParallel>> _circuitThreadsParallel;
473
474 bool _circuitDirty = false;
475};
476
477inline Circuit::Circuit() = default;
478
479inline Circuit::~Circuit()
480{
481 StopAutoTick();
482 DisconnectAllComponents();
483}
484
485inline bool Circuit::AddComponent( const Component::SPtr& component )
486{
487 if ( !component || _componentsSet.find( component ) != _componentsSet.end() )
488 {
489 return false;
490 }
491
492 // components within the circuit need to have as many buffers as there are threads in the circuit
493 component->SetBufferCount( _bufferCount, _currentBuffer );
494
495 PauseAutoTick();
496 _components.emplace_back( component.get() );
497 _componentsParallel.emplace_back( component.get() );
498 ResumeAutoTick();
499
500 _componentsSet.emplace( component );
501
502 return true;
503}
504
505inline bool Circuit::RemoveComponent( const Component::SPtr& component )
506{
507 if ( _componentsSet.find( component ) == _componentsSet.end() )
508 {
509 return false;
510 }
511
512 auto findFn = [&component]( auto comp ) { return comp == component.get(); };
513
514 if ( auto it = std::find_if( _components.begin(), _components.end(), findFn ); it != _components.end() )
515 {
516 PauseAutoTick();
517
518 DisconnectComponent( component );
519
520 _components.erase( it );
521
522 ResumeAutoTick();
523
524 _componentsSet.erase( component );
525
526 return true;
527 }
528
529 return false;
530}
531
532// cppcheck-suppress unusedFunction
533inline void Circuit::RemoveAllComponents()
534{
535 PauseAutoTick();
536
537 DisconnectAllComponents();
538
539 _components.clear();
540 _componentsParallel.clear();
541
542 ResumeAutoTick();
543
544 _componentsSet.clear();
545}
546
547inline int Circuit::GetComponentCount() const
548{
549 return (int)_components.size();
550}
551
552inline bool Circuit::ConnectOutToIn( const Component::SPtr& fromComponent,
553 int fromOutput,
554 const Component::SPtr& toComponent,
555 int toInput )
556{
557 if ( _componentsSet.find( fromComponent ) == _componentsSet.end() ||
558 _componentsSet.find( toComponent ) == _componentsSet.end() )
559 {
560 return false;
561 }
562
563 PauseAutoTick();
564
565 bool result = toComponent->ConnectInput( fromComponent, fromOutput, toInput );
566
567 _circuitDirty = result;
568
569 ResumeAutoTick();
570
571 return result;
572}
573
574inline bool Circuit::DisconnectComponent( const Component::SPtr& component )
575{
576 if ( _componentsSet.find( component ) == _componentsSet.end() )
577 {
578 return false;
579 }
580
581 PauseAutoTick();
582
583 component->DisconnectAllInputs();
584
585 // remove any connections this component has to other components
586 for ( auto comp : _components )
587 {
588 comp->DisconnectInput( component );
589 }
590
591 _circuitDirty = true;
592
593 ResumeAutoTick();
594
595 return true;
596}
597
598inline void Circuit::DisconnectAllComponents()
599{
600 PauseAutoTick();
601
602 for ( auto component : _components )
603 {
604 component->DisconnectAllInputs();
605 }
606
607 ResumeAutoTick();
608}
609
610inline void Circuit::SetBufferCount( int bufferCount )
611{
612 PauseAutoTick();
613
614 _bufferCount = bufferCount;
615
616 // stop all threads
617 for ( auto& circuitThread : _circuitThreads )
618 {
619 circuitThread.Stop();
620 }
621
622 // resize thread array
623 if ( _threadCount != 0 )
624 {
625 _circuitThreads.resize( 0 );
626 SetThreadCount( _threadCount );
627 }
628 else
629 {
630 _circuitThreads.resize( _bufferCount );
631
632 // initialise and start all threads
633 for ( int i = 0; i < _bufferCount; ++i )
634 {
635 _circuitThreads[i].Start( &_components, i, _bufferCount );
636 }
637 }
638
639 if ( _currentBuffer >= _bufferCount )
640 {
641 _currentBuffer = 0;
642 }
643
644 // set all components to the new buffer count
645 for ( auto component : _components )
646 {
647 component->SetBufferCount( _bufferCount, _currentBuffer );
648 }
649
650 ResumeAutoTick();
651}
652
653inline int Circuit::GetBufferCount() const
654{
655 return _bufferCount;
656}
657
658inline void Circuit::SetThreadCount( int threadCount )
659{
660 PauseAutoTick();
661
662 if ( _threadCount == 0 && threadCount != 0 )
663 {
664 _circuitDirty = true;
665 }
666
667 _threadCount = threadCount;
668
669 // stop all threads
670 for ( auto& circuitThreads : _circuitThreadsParallel )
671 {
672 for ( auto& circuitThread : circuitThreads )
673 {
674 circuitThread.Stop();
675 }
676 }
677
678 // resize thread array
679 if ( _threadCount == 0 )
680 {
681 _circuitThreadsParallel.resize( 0 );
682 SetBufferCount( _bufferCount );
683 }
684 else
685 {
686 _circuitThreadsParallel.resize( _bufferCount == 0 ? 1 : _bufferCount );
687 for ( auto& circuitThread : _circuitThreadsParallel )
688 {
689 circuitThread.resize( _threadCount );
690 }
691
692 // initialise and start all threads
693 int i = 0;
694 for ( auto& circuitThreads : _circuitThreadsParallel )
695 {
696 int j = 0;
697 for ( auto& circuitThread : circuitThreads )
698 {
699 circuitThread.Start( &_componentsParallel, i, _bufferCount, j++, _threadCount );
700 }
701 ++i;
702 }
703 }
704
705 ResumeAutoTick();
706}
707
708// cppcheck-suppress unusedFunction
709inline int Circuit::GetThreadCount() const
710{
711 return _threadCount;
712}
713
714inline void Circuit::Tick()
715{
716 if ( _circuitDirty )
717 {
718 _Optimize();
719 }
720
721 // process in multiple threads if this circuit has threads
722 // =======================================================
723 if ( _threadCount != 0 )
724 {
725 auto& circuitThreads = _circuitThreadsParallel[_currentBuffer];
726
727 for ( auto& circuitThread : circuitThreads )
728 {
729 circuitThread.Sync();
730 }
731 for ( auto& circuitThread : circuitThreads )
732 {
733 circuitThread.Resume();
734 }
735 }
736 // process in a single thread if this circuit has no threads
737 // =========================================================
738 else if ( _bufferCount == 0 )
739 {
740 // tick all internal components
741 for ( auto component : _components )
742 {
743 component->Tick();
744 }
745
746 return;
747 }
748 else
749 {
750 _circuitThreads[_currentBuffer].SyncAndResume(); // sync and resume thread x
751 }
752
753 if ( _bufferCount != 0 && ++_currentBuffer == _bufferCount )
754 {
755 _currentBuffer = 0;
756 }
757}
758
759inline void Circuit::Sync()
760{
761 // sync all threads
762 for ( auto& circuitThread : _circuitThreads )
763 {
764 circuitThread.Sync();
765 }
766 for ( auto& circuitThreads : _circuitThreadsParallel )
767 {
768 for ( auto& circuitThread : circuitThreads )
769 {
770 circuitThread.Sync();
771 }
772 }
773}
774
775inline void Circuit::StartAutoTick()
776{
777 _autoTickThread.Start( this );
778}
779
780inline void Circuit::StopAutoTick()
781{
782 _autoTickThread.Stop();
783 Sync();
784}
785
786inline void Circuit::PauseAutoTick()
787{
788 _autoTickThread.Pause();
789 Sync();
790}
791
792inline void Circuit::ResumeAutoTick()
793{
794 _autoTickThread.Resume();
795}
796
797inline void Circuit::Optimize()
798{
799 if ( _circuitDirty )
800 {
801 PauseAutoTick();
802 _Optimize();
803 ResumeAutoTick();
804 }
805}
806
807inline void Circuit::_Optimize()
808{
809 // scan for optimal series order -> update _components
810 std::vector<DSPatch::Component*> orderedComponents;
811 orderedComponents.reserve( _components.size() );
812
813 for ( auto component : _components )
814 {
815 component->Scan( orderedComponents );
816 }
817 for ( auto component : _components )
818 {
819 component->EndScan();
820 }
821
822 _components = std::move( orderedComponents );
823
824 // scan for optimal parallel order -> update _componentsParallel
825 if ( _threadCount != 0 )
826 {
827 std::vector<std::vector<DSPatch::Component*>> componentsMap;
828 componentsMap.reserve( _components.size() );
829
830 int scanPosition;
831 for ( auto component : _components )
832 {
833 component->ScanParallel( componentsMap, scanPosition );
834 }
835 for ( auto component : _components )
836 {
837 component->EndScan();
838 }
839
840 _componentsParallel.clear();
841 _componentsParallel.reserve( _components.size() );
842 for ( auto& componentsMapEntry : componentsMap )
843 {
844 _componentsParallel.insert( _componentsParallel.end(), componentsMapEntry.begin(), componentsMapEntry.end() );
845 }
846 }
847
848 // clear _circuitDirty flag
849 _circuitDirty = false;
850}
851
852} // namespace DSPatch
Workspace for adding and routing components.
Definition Circuit.h:73