DSPatch v.11.4.1
Loading...
Searching...
No Matches
Component.h
1/******************************************************************************
2DSPatch - The Refreshingly Simple C++ Dataflow Framework
3Copyright (c) 2024, Marcus Tomlinson
4
5BSD 2-Clause License
6
7Redistribution and use in source and binary forms, with or without
8modification, are permitted provided that the following conditions are met:
9
101. Redistributions of source code must retain the above copyright notice, this
11 list of conditions and the following disclaimer.
12
132. Redistributions in binary form must reproduce the above copyright notice,
14 this list of conditions and the following disclaimer in the documentation
15 and/or other materials provided with the distribution.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27******************************************************************************/
28
29#pragma once
30
31#include "SignalBus.h"
32
33#include <algorithm>
34#include <atomic>
35#include <string>
36#include <thread>
37#include <vector>
38
39namespace DSPatch
40{
41
43
65{
66public:
67 Component( const Component& ) = delete;
68 Component& operator=( const Component& ) = delete;
69
70 using SPtr = std::shared_ptr<Component>;
71
72 enum class ProcessOrder
73 {
74 InOrder,
75 OutOfOrder
76 };
77
78 Component( ProcessOrder processOrder = ProcessOrder::InOrder );
79 virtual ~Component();
80
81 bool ConnectInput( const Component::SPtr& fromComponent, int fromOutput, int toInput );
82
83 void DisconnectInput( int inputNo );
84 void DisconnectInput( const Component::SPtr& fromComponent );
85 void DisconnectAllInputs();
86
87 int GetInputCount() const;
88 int GetOutputCount() const;
89
90 std::string GetInputName( int inputNo ) const;
91 std::string GetOutputName( int outputNo ) const;
92
93 void SetBufferCount( int bufferCount, int startBuffer );
94 int GetBufferCount() const;
95
96 void Tick( int bufferNo );
97 void TickParallel( int bufferNo );
98
99 void Scan( std::vector<Component*>& components );
100 void ScanParallel( std::vector<std::vector<DSPatch::Component*>>& componentsMap, int& scanPosition );
101 void EndScan();
102
103protected:
104 inline virtual void Process_( SignalBus&, SignalBus& ) = 0;
105
106 void SetInputCount_( int inputCount, const std::vector<std::string>& inputNames = {} );
107 void SetOutputCount_( int outputCount, const std::vector<std::string>& outputNames = {} );
108
109private:
110 class AtomicFlag final
111 {
112 public:
113 AtomicFlag( const AtomicFlag& ) = delete;
114 AtomicFlag& operator=( const AtomicFlag& ) = delete;
115
116 inline AtomicFlag() = default;
117
118 inline AtomicFlag( AtomicFlag&& )
119 {
120 }
121
122 inline void WaitAndClear()
123 {
124 while ( flag.test_and_set( std::memory_order_acquire ) )
125 {
126 std::this_thread::yield();
127 }
128 }
129
130 inline void Set()
131 {
132 flag.clear( std::memory_order_release );
133 }
134
135 inline void Clear()
136 {
137 flag.test_and_set( std::memory_order_acquire );
138 }
139
140 private:
141 std::atomic_flag flag = { true }; // true here actually means unset / cleared
142 };
143
144 struct RefCounter final
145 {
146 int count = 0;
147 int total = 0;
148 AtomicFlag readyFlag;
149 };
150
151 struct Wire final
152 {
153 DSPatch::Component* fromComponent;
154 int fromOutput;
155 int toInput;
156 };
157
158 void _WaitForRelease( int bufferNo );
159 void _ReleaseNextBuffer( int bufferNo );
160
161 void _GetOutput( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus );
162 void _GetOutputParallel( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus );
163
164 void _IncRefs( int output );
165 void _DecRefs( int output );
166
167 const DSPatch::Component::ProcessOrder _processOrder;
168
169 int _bufferCount = 0;
170
171 std::vector<DSPatch::SignalBus> _inputBuses;
172 std::vector<DSPatch::SignalBus> _outputBuses;
173
174 std::vector<std::vector<RefCounter>> _refs; // RefCounter per output, per buffer
175
176 std::vector<Wire> _inputWires;
177
178 std::vector<AtomicFlag> _releaseFlags;
179
180 std::vector<std::string> _inputNames;
181 std::vector<std::string> _outputNames;
182
183 int _scanPosition = -1;
184};
185
186inline Component::Component( ProcessOrder processOrder )
187 : _processOrder( processOrder )
188{
189 SetBufferCount( 1, 0 );
190}
191
192inline Component::~Component() = default;
193
194inline bool Component::ConnectInput( const Component::SPtr& fromComponent, int fromOutput, int toInput )
195{
196 if ( fromOutput >= fromComponent->GetOutputCount() || toInput >= GetInputCount() )
197 {
198 return false;
199 }
200
201 // first make sure there are no wires already connected to this input
202 auto findFn = [&toInput]( const auto& wire ) { return wire.toInput == toInput; };
203
204 if ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end() )
205 {
206 if ( it->fromComponent == fromComponent.get() && it->fromOutput == fromOutput )
207 {
208 // this wire already exists
209 return true;
210 }
211
212 // update source output's reference count
213 it->fromComponent->_DecRefs( it->fromOutput );
214
215 // clear input
216 for ( auto& inputBus : _inputBuses )
217 {
218 inputBus.ClearValue( toInput );
219 }
220
221 // replace wire
222 it->fromComponent = fromComponent.get();
223 it->fromOutput = fromOutput;
224 }
225 else
226 {
227 // add new wire
228 _inputWires.emplace_back( Wire{ fromComponent.get(), fromOutput, toInput } );
229 }
230
231 // update source output's reference count
232 fromComponent->_IncRefs( fromOutput );
233
234 return true;
235}
236
237inline void Component::DisconnectInput( int inputNo )
238{
239 // remove wires connected to inputNo from _inputWires
240 auto findFn = [&inputNo]( const auto& wire ) { return wire.toInput == inputNo; };
241
242 if ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end() )
243 {
244 // update source output's reference count
245 it->fromComponent->_DecRefs( it->fromOutput );
246
247 // clear input
248 for ( auto& inputBus : _inputBuses )
249 {
250 inputBus.ClearValue( inputNo );
251 }
252
253 // remove wire
254 _inputWires.erase( it );
255 }
256}
257
258inline void Component::DisconnectInput( const Component::SPtr& fromComponent )
259{
260 // remove fromComponent from _inputWires
261 auto findFn = [&fromComponent]( const auto& wire ) { return wire.fromComponent == fromComponent.get(); };
262
263 for ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end();
264 it = std::find_if( it, _inputWires.end(), findFn ) )
265 {
266 // update source output's reference count
267 fromComponent->_DecRefs( it->fromOutput );
268
269 // clear input
270 for ( auto& inputBus : _inputBuses )
271 {
272 inputBus.ClearValue( it->toInput );
273 }
274
275 // remove wire
276 it = _inputWires.erase( it );
277 }
278}
279
280inline void Component::DisconnectAllInputs()
281{
282 // update all source output reference counts
283 for ( const auto& wire : _inputWires )
284 {
285 wire.fromComponent->_DecRefs( wire.fromOutput );
286 }
287
288 // clear all inputs
289 for ( auto& inputBus : _inputBuses )
290 {
291 inputBus.ClearAllValues();
292 }
293
294 // remove all wires
295 _inputWires.clear();
296}
297
298inline int Component::GetInputCount() const
299{
300 return _inputBuses[0].GetSignalCount();
301}
302
303inline int Component::GetOutputCount() const
304{
305 return _outputBuses[0].GetSignalCount();
306}
307
308// cppcheck-suppress unusedFunction
309inline std::string Component::GetInputName( int inputNo ) const
310{
311 if ( inputNo < (int)_inputNames.size() )
312 {
313 return _inputNames[inputNo];
314 }
315 return "";
316}
317
318// cppcheck-suppress unusedFunction
319inline std::string Component::GetOutputName( int outputNo ) const
320{
321 if ( outputNo < (int)_outputNames.size() )
322 {
323 return _outputNames[outputNo];
324 }
325 return "";
326}
327
328inline void Component::SetBufferCount( int bufferCount, int startBuffer )
329{
330 // _bufferCount is the current thread count / bufferCount is new thread count
331
332 if ( bufferCount <= 0 )
333 {
334 bufferCount = 1; // there needs to be at least 1 buffer
335 }
336
337 if ( startBuffer >= bufferCount )
338 {
339 startBuffer = 0;
340 }
341
342 // resize vectors
343 _inputBuses.resize( bufferCount );
344 _outputBuses.resize( bufferCount );
345
346 _releaseFlags.resize( bufferCount );
347
348 _refs.resize( bufferCount );
349
350 const auto inputCount = GetInputCount();
351 const auto outputCount = GetOutputCount();
352 const auto refCount = _refs[0].size();
353
354 // init vector values
355 for ( int i = 0; i < bufferCount; ++i )
356 {
357 _inputBuses[i].SetSignalCount( inputCount );
358 _outputBuses[i].SetSignalCount( outputCount );
359
360 if ( i == startBuffer )
361 {
362 _releaseFlags[i].Set();
363 }
364 else
365 {
366 _releaseFlags[i].Clear();
367 }
368
369 _refs[i].resize( refCount );
370 for ( size_t j = 0; j < refCount; ++j )
371 {
372 // sync output reference counts
373 _refs[i][j].total = _refs[0][j].total;
374 }
375 }
376
377 _bufferCount = bufferCount;
378}
379
380inline int Component::GetBufferCount() const
381{
382 return _bufferCount;
383}
384
385inline void Component::Tick( int bufferNo )
386{
387 auto& inputBus = _inputBuses[bufferNo];
388
389 for ( const auto& wire : _inputWires )
390 {
391 // get new inputs from incoming components
392 wire.fromComponent->_GetOutput( bufferNo, wire.fromOutput, wire.toInput, inputBus );
393 }
394
395 if ( _bufferCount != 1 && _processOrder == ProcessOrder::InOrder )
396 {
397 // wait for our turn to process
398 _WaitForRelease( bufferNo );
399
400 // call Process_() with newly aquired inputs
401 Process_( inputBus, _outputBuses[bufferNo] );
402
403 // signal that we're done processing
404 _ReleaseNextBuffer( bufferNo );
405 }
406 else
407 {
408 // call Process_() with newly aquired inputs
409 Process_( inputBus, _outputBuses[bufferNo] );
410 }
411}
412
413inline void Component::TickParallel( int bufferNo )
414{
415 auto& inputBus = _inputBuses[bufferNo];
416
417 for ( const auto& wire : _inputWires )
418 {
419 // get new inputs from incoming components
420 wire.fromComponent->_GetOutputParallel( bufferNo, wire.fromOutput, wire.toInput, inputBus );
421 }
422
423 if ( _bufferCount != 1 && _processOrder == ProcessOrder::InOrder )
424 {
425 // wait for our turn to process
426 _WaitForRelease( bufferNo );
427
428 // call Process_() with newly aquired inputs
429 Process_( inputBus, _outputBuses[bufferNo] );
430
431 // signal that we're done processing
432 _ReleaseNextBuffer( bufferNo );
433 }
434 else
435 {
436 // call Process_() with newly aquired inputs
437 Process_( inputBus, _outputBuses[bufferNo] );
438 }
439
440 // signal that our outputs are ready
441 for ( auto& ref : _refs[bufferNo] )
442 {
443 // readyFlags are cleared in _GetOutputParallel() which ofc is only called on outputs with refs
444 if ( ref.total != 0 )
445 {
446 ref.readyFlag.Set();
447 }
448 }
449}
450
451inline void Component::Scan( std::vector<Component*>& components )
452{
453 // continue only if this component has not already been scanned
454 if ( _scanPosition != -1 )
455 {
456 return;
457 }
458
459 // initialize _scanPosition
460 _scanPosition = 0;
461
462 for ( const auto& wire : _inputWires )
463 {
464 // scan incoming components
465 wire.fromComponent->Scan( components );
466 }
467
468 components.emplace_back( this );
469}
470
471inline void Component::ScanParallel( std::vector<std::vector<DSPatch::Component*>>& componentsMap, int& scanPosition )
472{
473 // continue only if this component has not already been scanned
474 if ( _scanPosition != -1 )
475 {
476 scanPosition = _scanPosition;
477 return;
478 }
479
480 // initialize scanPositions
481 _scanPosition = 0;
482 scanPosition = 0;
483
484 for ( const auto& wire : _inputWires )
485 {
486 // scan incoming components
487 wire.fromComponent->ScanParallel( componentsMap, scanPosition );
488
489 // ensure we're using the furthest scanPosition detected
490 _scanPosition = std::max( _scanPosition, ++scanPosition );
491 }
492
493 // insert component at _scanPosition
494 while ( (int)componentsMap.size() <= _scanPosition )
495 {
496 componentsMap.emplace_back( std::vector<DSPatch::Component*>{} );
497 componentsMap.back().reserve( componentsMap.capacity() );
498 }
499 componentsMap[_scanPosition].emplace_back( this );
500}
501
502inline void Component::EndScan()
503{
504 // reset _scanPosition
505 _scanPosition = -1;
506}
507
508inline void Component::SetInputCount_( int inputCount, const std::vector<std::string>& inputNames )
509{
510 _inputNames = inputNames;
511
512 for ( auto& inputBus : _inputBuses )
513 {
514 inputBus.SetSignalCount( inputCount );
515 }
516
517 _inputWires.reserve( inputCount );
518}
519
520inline void Component::SetOutputCount_( int outputCount, const std::vector<std::string>& outputNames )
521{
522 _outputNames = outputNames;
523
524 for ( auto& outputBus : _outputBuses )
525 {
526 outputBus.SetSignalCount( outputCount );
527 }
528
529 // add reference counters for our new outputs
530 for ( auto& ref : _refs )
531 {
532 ref.resize( outputCount );
533 }
534}
535
536inline void Component::_WaitForRelease( int bufferNo )
537{
538 _releaseFlags[bufferNo].WaitAndClear();
539}
540
541inline void Component::_ReleaseNextBuffer( int bufferNo )
542{
543 if ( ++bufferNo == _bufferCount ) // release the next available buffer
544 {
545 _releaseFlags[0].Set();
546 }
547 else
548 {
549 _releaseFlags[bufferNo].Set();
550 }
551}
552
553inline void Component::_GetOutput( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus )
554{
555 auto& signal = *_outputBuses[bufferNo].GetSignal( fromOutput );
556
557 if ( !signal.has_value() )
558 {
559 toBus.ClearValue( toInput );
560 return;
561 }
562
563 auto& ref = _refs[bufferNo][fromOutput];
564
565 if ( ref.total == 1 )
566 {
567 // there's only one reference, move the signal
568 toBus.MoveSignal( toInput, signal );
569 }
570 else if ( ++ref.count != ref.total )
571 {
572 // this is not the final reference, copy the signal
573 toBus.SetSignal( toInput, signal );
574 }
575 else
576 {
577 // this is the final reference, reset the counter, move the signal
578 ref.count = 0;
579 toBus.MoveSignal( toInput, signal );
580 }
581}
582
583inline void Component::_GetOutputParallel( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus )
584{
585 auto& signal = *_outputBuses[bufferNo].GetSignal( fromOutput );
586 auto& ref = _refs[bufferNo][fromOutput];
587
588 // wait for this output to be ready
589 ref.readyFlag.WaitAndClear();
590
591 if ( !signal.has_value() )
592 {
593 toBus.ClearValue( toInput );
594
595 if ( ref.total != 1 )
596 {
597 if ( ++ref.count != ref.total )
598 {
599 // this is not the final reference, wake next WaitAndClear()
600 ref.readyFlag.Set();
601 }
602 else
603 {
604 // this is the final reference, reset the counter
605 ref.count = 0;
606 }
607 }
608 }
609 else if ( ref.total == 1 )
610 {
611 // there's only one reference, move the signal
612 toBus.MoveSignal( toInput, signal );
613 }
614 else if ( ++ref.count != ref.total )
615 {
616 // this is not the final reference, copy the signal, wake next WaitAndClear()
617 toBus.SetSignal( toInput, signal );
618 ref.readyFlag.Set();
619 }
620 else
621 {
622 // this is the final reference, reset the counter, move the signal
623 ref.count = 0;
624 toBus.MoveSignal( toInput, signal );
625 }
626}
627
628inline void Component::_IncRefs( int output )
629{
630 for ( auto& ref : _refs )
631 {
632 ++ref[output].total;
633 }
634}
635
636inline void Component::_DecRefs( int output )
637{
638 for ( auto& ref : _refs )
639 {
640 --ref[output].total;
641 }
642}
643
644} // namespace DSPatch
Abstract base class for DSPatch components.
Definition Component.h:65
Signal container.
Definition SignalBus.h:53