DSPatch v.11.4.3
Loading...
Searching...
No Matches
Component.h
1/******************************************************************************
2DSPatch - The Refreshingly Simple C++ Dataflow Framework
3Copyright (c) 2025, Marcus Tomlinson
4
5BSD 2-Clause License
6
7Redistribution and use in source and binary forms, with or without
8modification, are permitted provided that the following conditions are met:
9
101. Redistributions of source code must retain the above copyright notice, this
11 list of conditions and the following disclaimer.
12
132. Redistributions in binary form must reproduce the above copyright notice,
14 this list of conditions and the following disclaimer in the documentation
15 and/or other materials provided with the distribution.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27******************************************************************************/
28
29#pragma once
30
31#include "SignalBus.h"
32
33#include <algorithm>
34#include <atomic>
35#include <string>
36#include <thread>
37#include <vector>
38
39namespace DSPatch
40{
41
43
65{
66public:
67 Component( const Component& ) = delete;
68 Component& operator=( const Component& ) = delete;
69
70 using SPtr = std::shared_ptr<Component>;
71
72 enum class ProcessOrder
73 {
74 InOrder,
75 OutOfOrder
76 };
77
78 Component( ProcessOrder processOrder = ProcessOrder::InOrder );
79 virtual ~Component();
80
81 bool ConnectInput( const Component::SPtr& fromComponent, int fromOutput, int toInput );
82
83 void DisconnectInput( int inputNo );
84 void DisconnectInput( const Component::SPtr& fromComponent );
85 void DisconnectAllInputs();
86
87 int GetInputCount() const;
88 int GetOutputCount() const;
89
90 std::string GetInputName( int inputNo ) const;
91 std::string GetOutputName( int outputNo ) const;
92
93 void SetBufferCount( int bufferCount, int startBuffer );
94 int GetBufferCount() const;
95
96 void Tick();
97 void Tick( int bufferNo );
98 void TickParallel();
99 void TickParallel( int bufferNo );
100
101 void Scan( std::vector<Component*>& components );
102 void ScanParallel( std::vector<std::vector<DSPatch::Component*>>& componentsMap, int& scanPosition );
103 void EndScan();
104
105protected:
106 inline virtual void Process_( SignalBus&, SignalBus& ) = 0;
107
108 void SetInputCount_( int inputCount, const std::vector<std::string>& inputNames = {} );
109 void SetOutputCount_( int outputCount, const std::vector<std::string>& outputNames = {} );
110
111private:
112 class AtomicFlag final
113 {
114 public:
115 AtomicFlag( const AtomicFlag& ) = delete;
116 AtomicFlag& operator=( const AtomicFlag& ) = delete;
117
118 inline AtomicFlag() = default;
119
120 inline AtomicFlag( AtomicFlag&& )
121 {
122 }
123
124 inline void WaitAndClear()
125 {
126 while ( flag.test_and_set( std::memory_order_acquire ) )
127 {
128 std::this_thread::yield();
129 }
130 }
131
132 inline void Set()
133 {
134 flag.clear( std::memory_order_release );
135 }
136
137 inline void Clear()
138 {
139 flag.test_and_set( std::memory_order_acquire );
140 }
141
142 private:
143 std::atomic_flag flag = { true }; // true here actually means unset / cleared
144 };
145
146 struct RefCounter final
147 {
148 int count = 0;
149 int total = 0;
150 AtomicFlag readyFlag;
151 };
152
153 struct Wire final
154 {
155 DSPatch::Component* fromComponent;
156 int fromOutput;
157 int toInput;
158 };
159
160 void _WaitForRelease( int bufferNo );
161 void _ReleaseNextBuffer( int bufferNo );
162
163 void _GetOutput( int fromOutput, int toInput, DSPatch::SignalBus& toBus );
164 void _GetOutput( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus );
165 void _GetOutputParallel( int fromOutput, int toInput, DSPatch::SignalBus& toBus );
166 void _GetOutputParallel( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus );
167
168 void _IncRefs( int output );
169 void _DecRefs( int output );
170
171 const DSPatch::Component::ProcessOrder _processOrder;
172
173 int _bufferCount = 0;
174
175 std::vector<DSPatch::SignalBus> _inputBuses;
176 std::vector<DSPatch::SignalBus> _outputBuses;
177
178 std::vector<std::vector<RefCounter>> _refs; // RefCounter per output, per buffer
179
180 std::vector<Wire> _inputWires;
181
182 std::vector<AtomicFlag> _releaseFlags;
183
184 std::vector<std::string> _inputNames;
185 std::vector<std::string> _outputNames;
186
187 int _scanPosition = -1;
188};
189
190inline Component::Component( ProcessOrder processOrder )
191 : _processOrder( processOrder )
192{
193 SetBufferCount( 1, 0 );
194}
195
196inline Component::~Component() = default;
197
198inline bool Component::ConnectInput( const Component::SPtr& fromComponent, int fromOutput, int toInput )
199{
200 if ( fromOutput >= fromComponent->GetOutputCount() || toInput >= GetInputCount() )
201 {
202 return false;
203 }
204
205 // first make sure there are no wires already connected to this input
206 auto findFn = [&toInput]( const auto& wire ) { return wire.toInput == toInput; };
207
208 if ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end() )
209 {
210 if ( it->fromComponent == fromComponent.get() && it->fromOutput == fromOutput )
211 {
212 // this wire already exists
213 return true;
214 }
215
216 // update source output's reference count
217 it->fromComponent->_DecRefs( it->fromOutput );
218
219 // clear input
220 for ( auto& inputBus : _inputBuses )
221 {
222 inputBus.ClearValue( toInput );
223 }
224
225 // replace wire
226 it->fromComponent = fromComponent.get();
227 it->fromOutput = fromOutput;
228 }
229 else
230 {
231 // add new wire
232 _inputWires.emplace_back( Wire{ fromComponent.get(), fromOutput, toInput } );
233 }
234
235 // update source output's reference count
236 fromComponent->_IncRefs( fromOutput );
237
238 return true;
239}
240
241inline void Component::DisconnectInput( int inputNo )
242{
243 // remove wires connected to inputNo from _inputWires
244 auto findFn = [&inputNo]( const auto& wire ) { return wire.toInput == inputNo; };
245
246 if ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end() )
247 {
248 // update source output's reference count
249 it->fromComponent->_DecRefs( it->fromOutput );
250
251 // clear input
252 for ( auto& inputBus : _inputBuses )
253 {
254 inputBus.ClearValue( inputNo );
255 }
256
257 // remove wire
258 _inputWires.erase( it );
259 }
260}
261
262inline void Component::DisconnectInput( const Component::SPtr& fromComponent )
263{
264 // remove fromComponent from _inputWires
265 auto findFn = [&fromComponent]( const auto& wire ) { return wire.fromComponent == fromComponent.get(); };
266
267 for ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end();
268 it = std::find_if( it, _inputWires.end(), findFn ) )
269 {
270 // update source output's reference count
271 fromComponent->_DecRefs( it->fromOutput );
272
273 // clear input
274 for ( auto& inputBus : _inputBuses )
275 {
276 inputBus.ClearValue( it->toInput );
277 }
278
279 // remove wire
280 it = _inputWires.erase( it );
281 }
282}
283
284inline void Component::DisconnectAllInputs()
285{
286 // update all source output reference counts
287 for ( const auto& wire : _inputWires )
288 {
289 wire.fromComponent->_DecRefs( wire.fromOutput );
290 }
291
292 // clear all inputs
293 for ( auto& inputBus : _inputBuses )
294 {
295 inputBus.ClearAllValues();
296 }
297
298 // remove all wires
299 _inputWires.clear();
300}
301
302inline int Component::GetInputCount() const
303{
304 return _inputBuses[0].GetSignalCount();
305}
306
307inline int Component::GetOutputCount() const
308{
309 return _outputBuses[0].GetSignalCount();
310}
311
312// cppcheck-suppress unusedFunction
313inline std::string Component::GetInputName( int inputNo ) const
314{
315 if ( inputNo < (int)_inputNames.size() )
316 {
317 return _inputNames[inputNo];
318 }
319 return "";
320}
321
322// cppcheck-suppress unusedFunction
323inline std::string Component::GetOutputName( int outputNo ) const
324{
325 if ( outputNo < (int)_outputNames.size() )
326 {
327 return _outputNames[outputNo];
328 }
329 return "";
330}
331
332inline void Component::SetBufferCount( int bufferCount, int startBuffer )
333{
334 // _bufferCount is the current thread count / bufferCount is new thread count
335
336 if ( bufferCount <= 0 )
337 {
338 bufferCount = 1; // there needs to be at least 1 buffer
339 }
340
341 if ( startBuffer >= bufferCount )
342 {
343 startBuffer = 0;
344 }
345
346 // resize vectors
347 _inputBuses.resize( bufferCount );
348 _outputBuses.resize( bufferCount );
349
350 _releaseFlags.resize( bufferCount );
351
352 _refs.resize( bufferCount );
353
354 const auto inputCount = GetInputCount();
355 const auto outputCount = GetOutputCount();
356 const auto refCount = _refs[0].size();
357
358 // init vector values
359 for ( int i = 0; i < bufferCount; ++i )
360 {
361 _inputBuses[i].SetSignalCount( inputCount );
362 _outputBuses[i].SetSignalCount( outputCount );
363
364 if ( i == startBuffer )
365 {
366 _releaseFlags[i].Set();
367 }
368 else
369 {
370 _releaseFlags[i].Clear();
371 }
372
373 _refs[i].resize( refCount );
374 for ( size_t j = 0; j < refCount; ++j )
375 {
376 // sync output reference counts
377 _refs[i][j].total = _refs[0][j].total;
378 }
379 }
380
381 _bufferCount = bufferCount;
382}
383
384inline int Component::GetBufferCount() const
385{
386 return _bufferCount;
387}
388
389inline void Component::Tick()
390{
391 auto& inputBus = _inputBuses.front();
392
393 for ( const auto& wire : _inputWires )
394 {
395 // get new inputs from incoming components
396 wire.fromComponent->_GetOutput( wire.fromOutput, wire.toInput, inputBus );
397 }
398
399 // call Process_() with newly aquired inputs
400 Process_( inputBus, _outputBuses.front() );
401}
402
403inline void Component::Tick( int bufferNo )
404{
405 auto& inputBus = _inputBuses[bufferNo];
406
407 for ( const auto& wire : _inputWires )
408 {
409 // get new inputs from incoming components
410 wire.fromComponent->_GetOutput( bufferNo, wire.fromOutput, wire.toInput, inputBus );
411 }
412
413 if ( _bufferCount != 1 && _processOrder == ProcessOrder::InOrder )
414 {
415 // wait for our turn to process
416 _WaitForRelease( bufferNo );
417
418 // call Process_() with newly aquired inputs
419 Process_( inputBus, _outputBuses[bufferNo] );
420
421 // signal that we're done processing
422 _ReleaseNextBuffer( bufferNo );
423 }
424 else
425 {
426 // call Process_() with newly aquired inputs
427 Process_( inputBus, _outputBuses[bufferNo] );
428 }
429}
430
431inline void Component::TickParallel()
432{
433 auto& inputBus = _inputBuses.front();
434
435 for ( const auto& wire : _inputWires )
436 {
437 // get new inputs from incoming components
438 wire.fromComponent->_GetOutputParallel( wire.fromOutput, wire.toInput, inputBus );
439 }
440
441 // call Process_() with newly aquired inputs
442 Process_( inputBus, _outputBuses.front() );
443
444 // signal that our outputs are ready
445 for ( auto& ref : _refs.front() )
446 {
447 // readyFlags are cleared in _GetOutputParallel() which ofc is only called on outputs with refs
448 if ( ref.total != 0 )
449 {
450 ref.readyFlag.Set();
451 }
452 }
453}
454
455inline void Component::TickParallel( int bufferNo )
456{
457 auto& inputBus = _inputBuses[bufferNo];
458
459 for ( const auto& wire : _inputWires )
460 {
461 // get new inputs from incoming components
462 wire.fromComponent->_GetOutputParallel( bufferNo, wire.fromOutput, wire.toInput, inputBus );
463 }
464
465 if ( _bufferCount != 1 && _processOrder == ProcessOrder::InOrder )
466 {
467 // wait for our turn to process
468 _WaitForRelease( bufferNo );
469
470 // call Process_() with newly aquired inputs
471 Process_( inputBus, _outputBuses[bufferNo] );
472
473 // signal that we're done processing
474 _ReleaseNextBuffer( bufferNo );
475 }
476 else
477 {
478 // call Process_() with newly aquired inputs
479 Process_( inputBus, _outputBuses[bufferNo] );
480 }
481
482 // signal that our outputs are ready
483 for ( auto& ref : _refs[bufferNo] )
484 {
485 // readyFlags are cleared in _GetOutputParallel() which ofc is only called on outputs with refs
486 if ( ref.total != 0 )
487 {
488 ref.readyFlag.Set();
489 }
490 }
491}
492
493inline void Component::Scan( std::vector<Component*>& components )
494{
495 // continue only if this component has not already been scanned
496 if ( _scanPosition != -1 )
497 {
498 return;
499 }
500
501 // initialize _scanPosition
502 _scanPosition = 0;
503
504 for ( const auto& wire : _inputWires )
505 {
506 // scan incoming components
507 wire.fromComponent->Scan( components );
508 }
509
510 components.emplace_back( this );
511}
512
513inline void Component::ScanParallel( std::vector<std::vector<DSPatch::Component*>>& componentsMap, int& scanPosition )
514{
515 // continue only if this component has not already been scanned
516 if ( _scanPosition != -1 )
517 {
518 scanPosition = _scanPosition;
519 return;
520 }
521
522 // initialize scanPositions
523 _scanPosition = 0;
524 scanPosition = 0;
525
526 for ( const auto& wire : _inputWires )
527 {
528 // scan incoming components
529 wire.fromComponent->ScanParallel( componentsMap, scanPosition );
530
531 // ensure we're using the furthest scanPosition detected
532 _scanPosition = std::max( _scanPosition, ++scanPosition );
533 }
534
535 // insert component at _scanPosition
536 while ( (int)componentsMap.size() <= _scanPosition )
537 {
538 componentsMap.emplace_back( std::vector<DSPatch::Component*>{} );
539 componentsMap.back().reserve( componentsMap.capacity() );
540 }
541 componentsMap[_scanPosition].emplace_back( this );
542}
543
544inline void Component::EndScan()
545{
546 // reset _scanPosition
547 _scanPosition = -1;
548}
549
550inline void Component::SetInputCount_( int inputCount, const std::vector<std::string>& inputNames )
551{
552 _inputNames = inputNames;
553
554 for ( auto& inputBus : _inputBuses )
555 {
556 inputBus.SetSignalCount( inputCount );
557 }
558
559 _inputWires.reserve( inputCount );
560}
561
562inline void Component::SetOutputCount_( int outputCount, const std::vector<std::string>& outputNames )
563{
564 _outputNames = outputNames;
565
566 for ( auto& outputBus : _outputBuses )
567 {
568 outputBus.SetSignalCount( outputCount );
569 }
570
571 // add reference counters for our new outputs
572 for ( auto& ref : _refs )
573 {
574 ref.resize( outputCount );
575 }
576}
577
578inline void Component::_WaitForRelease( int bufferNo )
579{
580 _releaseFlags[bufferNo].WaitAndClear();
581}
582
583inline void Component::_ReleaseNextBuffer( int bufferNo )
584{
585 if ( ++bufferNo == _bufferCount ) // release the next available buffer
586 {
587 _releaseFlags[0].Set();
588 }
589 else
590 {
591 _releaseFlags[bufferNo].Set();
592 }
593}
594
595inline void Component::_GetOutput( int fromOutput, int toInput, DSPatch::SignalBus& toBus )
596{
597 auto& signal = *_outputBuses.front().GetSignal( fromOutput );
598
599 if ( !signal.has_value() )
600 {
601 toBus.ClearValue( toInput );
602 return;
603 }
604
605 auto& ref = _refs.front()[fromOutput];
606
607 if ( ref.total == 1 )
608 {
609 // there's only one reference, move the signal
610 toBus.MoveSignal( toInput, signal );
611 }
612 else if ( ++ref.count != ref.total )
613 {
614 // this is not the final reference, copy the signal
615 toBus.SetSignal( toInput, signal );
616 }
617 else
618 {
619 // this is the final reference, reset the counter, move the signal
620 ref.count = 0;
621 toBus.MoveSignal( toInput, signal );
622 }
623}
624
625inline void Component::_GetOutput( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus )
626{
627 auto& signal = *_outputBuses[bufferNo].GetSignal( fromOutput );
628
629 if ( !signal.has_value() )
630 {
631 toBus.ClearValue( toInput );
632 return;
633 }
634
635 auto& ref = _refs[bufferNo][fromOutput];
636
637 if ( ref.total == 1 )
638 {
639 // there's only one reference, move the signal
640 toBus.MoveSignal( toInput, signal );
641 }
642 else if ( ++ref.count != ref.total )
643 {
644 // this is not the final reference, copy the signal
645 toBus.SetSignal( toInput, signal );
646 }
647 else
648 {
649 // this is the final reference, reset the counter, move the signal
650 ref.count = 0;
651 toBus.MoveSignal( toInput, signal );
652 }
653}
654
655inline void Component::_GetOutputParallel( int fromOutput, int toInput, DSPatch::SignalBus& toBus )
656{
657 auto& signal = *_outputBuses.front().GetSignal( fromOutput );
658 auto& ref = _refs.front()[fromOutput];
659
660 // wait for this output to be ready
661 ref.readyFlag.WaitAndClear();
662
663 if ( !signal.has_value() )
664 {
665 toBus.ClearValue( toInput );
666
667 if ( ref.total != 1 )
668 {
669 if ( ++ref.count != ref.total )
670 {
671 // this is not the final reference, wake next WaitAndClear()
672 ref.readyFlag.Set();
673 }
674 else
675 {
676 // this is the final reference, reset the counter
677 ref.count = 0;
678 }
679 }
680 }
681 else if ( ref.total == 1 )
682 {
683 // there's only one reference, move the signal
684 toBus.MoveSignal( toInput, signal );
685 }
686 else if ( ++ref.count != ref.total )
687 {
688 // this is not the final reference, copy the signal, wake next WaitAndClear()
689 toBus.SetSignal( toInput, signal );
690 ref.readyFlag.Set();
691 }
692 else
693 {
694 // this is the final reference, reset the counter, move the signal
695 ref.count = 0;
696 toBus.MoveSignal( toInput, signal );
697 }
698}
699
700inline void Component::_GetOutputParallel( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus )
701{
702 auto& signal = *_outputBuses[bufferNo].GetSignal( fromOutput );
703 auto& ref = _refs[bufferNo][fromOutput];
704
705 // wait for this output to be ready
706 ref.readyFlag.WaitAndClear();
707
708 if ( !signal.has_value() )
709 {
710 toBus.ClearValue( toInput );
711
712 if ( ref.total != 1 )
713 {
714 if ( ++ref.count != ref.total )
715 {
716 // this is not the final reference, wake next WaitAndClear()
717 ref.readyFlag.Set();
718 }
719 else
720 {
721 // this is the final reference, reset the counter
722 ref.count = 0;
723 }
724 }
725 }
726 else if ( ref.total == 1 )
727 {
728 // there's only one reference, move the signal
729 toBus.MoveSignal( toInput, signal );
730 }
731 else if ( ++ref.count != ref.total )
732 {
733 // this is not the final reference, copy the signal, wake next WaitAndClear()
734 toBus.SetSignal( toInput, signal );
735 ref.readyFlag.Set();
736 }
737 else
738 {
739 // this is the final reference, reset the counter, move the signal
740 ref.count = 0;
741 toBus.MoveSignal( toInput, signal );
742 }
743}
744
745inline void Component::_IncRefs( int output )
746{
747 for ( auto& ref : _refs )
748 {
749 ++ref[output].total;
750 }
751}
752
753inline void Component::_DecRefs( int output )
754{
755 for ( auto& ref : _refs )
756 {
757 --ref[output].total;
758 }
759}
760
761} // namespace DSPatch
Abstract base class for DSPatch components.
Definition Component.h:65
Signal container.
Definition SignalBus.h:53