DSPatch v.11.2.0
Loading...
Searching...
No Matches
Component.h
1/******************************************************************************
2DSPatch - The Refreshingly Simple C++ Dataflow Framework
3Copyright (c) 2024, Marcus Tomlinson
4
5BSD 2-Clause License
6
7Redistribution and use in source and binary forms, with or without
8modification, are permitted provided that the following conditions are met:
9
101. Redistributions of source code must retain the above copyright notice, this
11 list of conditions and the following disclaimer.
12
132. Redistributions in binary form must reproduce the above copyright notice,
14 this list of conditions and the following disclaimer in the documentation
15 and/or other materials provided with the distribution.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27******************************************************************************/
28
29#pragma once
30
31#include "SignalBus.h"
32
33#include <algorithm>
34#include <atomic>
35#include <string>
36#include <thread>
37#include <vector>
38
39namespace DSPatch
40{
41
43
65{
66public:
67 Component( const Component& ) = delete;
68 Component& operator=( const Component& ) = delete;
69
70 using SPtr = std::shared_ptr<Component>;
71
72 enum class ProcessOrder
73 {
74 InOrder,
75 OutOfOrder
76 };
77
78 Component( ProcessOrder processOrder = ProcessOrder::InOrder );
79 virtual ~Component();
80
81 bool ConnectInput( const Component::SPtr& fromComponent, int fromOutput, int toInput );
82
83 void DisconnectInput( int inputNo );
84 void DisconnectInput( const Component::SPtr& fromComponent );
85 void DisconnectAllInputs();
86
87 int GetInputCount() const;
88 int GetOutputCount() const;
89
90 std::string GetInputName( int inputNo ) const;
91 std::string GetOutputName( int outputNo ) const;
92
93 void SetBufferCount( int bufferCount, int startBuffer );
94 int GetBufferCount() const;
95
96 void Tick( int bufferNo );
97 void TickParallel( int bufferNo );
98
99 void Scan( std::vector<Component*>& components );
100 void ScanParallel( std::vector<std::vector<DSPatch::Component*>>& componentsMap, int& scanPosition );
101 void EndScan();
102
103protected:
104 inline virtual void Process_( SignalBus&, SignalBus& ) = 0;
105
106 void SetInputCount_( int inputCount, const std::vector<std::string>& inputNames = {} );
107 void SetOutputCount_( int outputCount, const std::vector<std::string>& outputNames = {} );
108
109private:
110 class AtomicFlag final
111 {
112 public:
113 AtomicFlag( const AtomicFlag& ) = delete;
114 AtomicFlag& operator=( const AtomicFlag& ) = delete;
115
116 inline AtomicFlag() = default;
117
118 inline AtomicFlag( AtomicFlag&& )
119 {
120 }
121
122 inline void WaitAndClear()
123 {
124 while ( flag.test_and_set( std::memory_order_acquire ) )
125 {
126 std::this_thread::yield();
127 }
128 }
129
130 inline void Set()
131 {
132 flag.clear( std::memory_order_release );
133 }
134
135 inline void Clear()
136 {
137 flag.test_and_set( std::memory_order_acquire );
138 }
139
140 private:
141 std::atomic_flag flag = { true }; // true here actually means unset / cleared
142 };
143
144 struct RefCounter final
145 {
146 int count = 0;
147 int total = 0;
148 AtomicFlag readyFlag;
149 };
150
151 struct Wire final
152 {
153 DSPatch::Component* fromComponent;
154 int fromOutput;
155 int toInput;
156 };
157
158 void _WaitForRelease( int bufferNo );
159 void _ReleaseNextBuffer( int bufferNo );
160
161 void _GetOutput( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus );
162 void _GetOutputParallel( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus );
163
164 void _IncRefs( int output );
165 void _DecRefs( int output );
166
167 const DSPatch::Component::ProcessOrder _processOrder;
168
169 int _bufferCount = 0;
170
171 std::vector<DSPatch::SignalBus> _inputBuses;
172 std::vector<DSPatch::SignalBus> _outputBuses;
173
174 std::vector<std::vector<RefCounter>> _refs; // RefCounter per output, per buffer
175
176 std::vector<Wire> _inputWires;
177
178 std::vector<AtomicFlag> _releaseFlags;
179
180 std::vector<std::string> _inputNames;
181 std::vector<std::string> _outputNames;
182
183 int _scanPosition = -1;
184};
185
186inline Component::Component( ProcessOrder processOrder )
187 : _processOrder( processOrder )
188{
189 SetBufferCount( 1, 0 );
190}
191
192inline Component::~Component() = default;
193
194inline bool Component::ConnectInput( const Component::SPtr& fromComponent, int fromOutput, int toInput )
195{
196 if ( fromOutput >= fromComponent->GetOutputCount() || toInput >= GetInputCount() )
197 {
198 return false;
199 }
200
201 // first make sure there are no wires already connected to this input
202 auto findFn = [&toInput]( const auto& wire ) { return wire.toInput == toInput; };
203
204 if ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end() )
205 {
206 if ( it->fromComponent == fromComponent.get() && it->fromOutput == fromOutput )
207 {
208 // this wire already exists
209 return true;
210 }
211
212 // update source output's reference count
213 it->fromComponent->_DecRefs( it->fromOutput );
214
215 // replace wire
216 it->fromComponent = fromComponent.get();
217 it->fromOutput = fromOutput;
218 }
219 else
220 {
221 // add new wire
222 _inputWires.emplace_back( Wire{ fromComponent.get(), fromOutput, toInput } );
223 }
224
225 // update source output's reference count
226 fromComponent->_IncRefs( fromOutput );
227
228 return true;
229}
230
231inline void Component::DisconnectInput( int inputNo )
232{
233 // remove wires connected to inputNo from _inputWires
234 auto findFn = [&inputNo]( const auto& wire ) { return wire.toInput == inputNo; };
235
236 if ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end() )
237 {
238 // update source output's reference count
239 it->fromComponent->_DecRefs( it->fromOutput );
240
241 _inputWires.erase( it );
242 }
243}
244
245inline void Component::DisconnectInput( const Component::SPtr& fromComponent )
246{
247 // remove fromComponent from _inputWires
248 auto findFn = [&fromComponent]( const auto& wire ) { return wire.fromComponent == fromComponent.get(); };
249
250 for ( auto it = std::find_if( _inputWires.begin(), _inputWires.end(), findFn ); it != _inputWires.end();
251 it = std::find_if( it, _inputWires.end(), findFn ) )
252 {
253 // update source output's reference count
254 fromComponent->_DecRefs( it->fromOutput );
255
256 it = _inputWires.erase( it );
257 }
258}
259
260inline void Component::DisconnectAllInputs()
261{
262 // remove all wires from _inputWires
263 for ( const auto& wire : _inputWires )
264 {
265 // update source output's reference count
266 wire.fromComponent->_DecRefs( wire.fromOutput );
267 }
268
269 _inputWires.clear();
270}
271
272inline int Component::GetInputCount() const
273{
274 return _inputBuses[0].GetSignalCount();
275}
276
277inline int Component::GetOutputCount() const
278{
279 return _outputBuses[0].GetSignalCount();
280}
281
282// cppcheck-suppress unusedFunction
283inline std::string Component::GetInputName( int inputNo ) const
284{
285 if ( inputNo < (int)_inputNames.size() )
286 {
287 return _inputNames[inputNo];
288 }
289 return "";
290}
291
292// cppcheck-suppress unusedFunction
293inline std::string Component::GetOutputName( int outputNo ) const
294{
295 if ( outputNo < (int)_outputNames.size() )
296 {
297 return _outputNames[outputNo];
298 }
299 return "";
300}
301
302inline void Component::SetBufferCount( int bufferCount, int startBuffer )
303{
304 // _bufferCount is the current thread count / bufferCount is new thread count
305
306 if ( bufferCount <= 0 )
307 {
308 bufferCount = 1; // there needs to be at least 1 buffer
309 }
310
311 if ( startBuffer >= bufferCount )
312 {
313 startBuffer = 0;
314 }
315
316 // resize vectors
317 _inputBuses.resize( bufferCount );
318 _outputBuses.resize( bufferCount );
319
320 _releaseFlags.resize( bufferCount );
321
322 _refs.resize( bufferCount );
323
324 const auto inputCount = GetInputCount();
325 const auto outputCount = GetOutputCount();
326 const auto refCount = _refs[0].size();
327
328 // init vector values
329 for ( int i = 0; i < bufferCount; ++i )
330 {
331 _inputBuses[i].SetSignalCount( inputCount );
332 _outputBuses[i].SetSignalCount( outputCount );
333
334 if ( i == startBuffer )
335 {
336 _releaseFlags[i].Set();
337 }
338 else
339 {
340 _releaseFlags[i].Clear();
341 }
342
343 _refs[i].resize( refCount );
344 for ( size_t j = 0; j < refCount; ++j )
345 {
346 // sync output reference counts
347 _refs[i][j].total = _refs[0][j].total;
348 }
349 }
350
351 _bufferCount = bufferCount;
352}
353
354inline int Component::GetBufferCount() const
355{
356 return (int)_inputBuses.size();
357}
358
359inline void Component::Tick( int bufferNo )
360{
361 auto& inputBus = _inputBuses[bufferNo];
362 auto& outputBus = _outputBuses[bufferNo];
363
364 // clear inputs
365 inputBus.ClearAllValues();
366
367 for ( const auto& wire : _inputWires )
368 {
369 // get new inputs from incoming components
370 wire.fromComponent->_GetOutput( bufferNo, wire.fromOutput, wire.toInput, inputBus );
371 }
372
373 // clear outputs
374 outputBus.ClearAllValues();
375
376 if ( _bufferCount != 1 && _processOrder == ProcessOrder::InOrder )
377 {
378 // wait for our turn to process
379 _WaitForRelease( bufferNo );
380
381 // call Process_() with newly aquired inputs
382 Process_( inputBus, outputBus );
383
384 // signal that we're done processing
385 _ReleaseNextBuffer( bufferNo );
386 }
387 else
388 {
389 // call Process_() with newly aquired inputs
390 Process_( inputBus, outputBus );
391 }
392}
393
394inline void Component::TickParallel( int bufferNo )
395{
396 auto& inputBus = _inputBuses[bufferNo];
397 auto& outputBus = _outputBuses[bufferNo];
398
399 // clear inputs and outputs
400 inputBus.ClearAllValues();
401 outputBus.ClearAllValues();
402
403 for ( const auto& wire : _inputWires )
404 {
405 // get new inputs from incoming components
406 wire.fromComponent->_GetOutputParallel( bufferNo, wire.fromOutput, wire.toInput, inputBus );
407 }
408
409 if ( _bufferCount != 1 && _processOrder == ProcessOrder::InOrder )
410 {
411 // wait for our turn to process
412 _WaitForRelease( bufferNo );
413
414 // call Process_() with newly aquired inputs
415 Process_( inputBus, outputBus );
416
417 // signal that we're done processing
418 _ReleaseNextBuffer( bufferNo );
419 }
420 else
421 {
422 // call Process_() with newly aquired inputs
423 Process_( inputBus, outputBus );
424 }
425
426 // signal that our outputs are ready
427 for ( auto& ref : _refs[bufferNo] )
428 {
429 // readyFlags are cleared in _GetOutputParallel() which ofc is only called on outputs with refs
430 if ( ref.total != 0 )
431 {
432 ref.readyFlag.Set();
433 }
434 }
435}
436
437inline void Component::Scan( std::vector<Component*>& components )
438{
439 // continue only if this component has not already been scanned
440 if ( _scanPosition != -1 )
441 {
442 return;
443 }
444
445 // initialize _scanPosition
446 _scanPosition = 0;
447
448 for ( const auto& wire : _inputWires )
449 {
450 // scan incoming components
451 wire.fromComponent->Scan( components );
452 }
453
454 components.emplace_back( this );
455}
456
457inline void Component::ScanParallel( std::vector<std::vector<DSPatch::Component*>>& componentsMap, int& scanPosition )
458{
459 // continue only if this component has not already been scanned
460 if ( _scanPosition != -1 )
461 {
462 scanPosition = _scanPosition;
463 return;
464 }
465
466 // initialize scanPositions
467 _scanPosition = 0;
468 scanPosition = 0;
469
470 for ( const auto& wire : _inputWires )
471 {
472 // scan incoming components
473 wire.fromComponent->ScanParallel( componentsMap, scanPosition );
474
475 // ensure we're using the furthest scanPosition detected
476 _scanPosition = std::max( _scanPosition, ++scanPosition );
477 }
478
479 // insert component at _scanPosition
480 if ( _scanPosition == (int)componentsMap.size() )
481 {
482 componentsMap.emplace_back( std::vector<DSPatch::Component*>{} );
483 componentsMap[_scanPosition].reserve( componentsMap.capacity() );
484 }
485 componentsMap[_scanPosition].emplace_back( this );
486}
487
488inline void Component::EndScan()
489{
490 // reset _scanPosition
491 _scanPosition = -1;
492}
493
494inline void Component::SetInputCount_( int inputCount, const std::vector<std::string>& inputNames )
495{
496 _inputNames = inputNames;
497
498 for ( auto& inputBus : _inputBuses )
499 {
500 inputBus.SetSignalCount( inputCount );
501 }
502
503 _inputWires.reserve( inputCount );
504}
505
506inline void Component::SetOutputCount_( int outputCount, const std::vector<std::string>& outputNames )
507{
508 _outputNames = outputNames;
509
510 for ( auto& outputBus : _outputBuses )
511 {
512 outputBus.SetSignalCount( outputCount );
513 }
514
515 // add reference counters for our new outputs
516 for ( auto& ref : _refs )
517 {
518 ref.resize( outputCount );
519 }
520}
521
522inline void Component::_WaitForRelease( int bufferNo )
523{
524 _releaseFlags[bufferNo].WaitAndClear();
525}
526
527inline void Component::_ReleaseNextBuffer( int bufferNo )
528{
529 if ( ++bufferNo == _bufferCount ) // release the next available buffer
530 {
531 _releaseFlags[0].Set();
532 }
533 else
534 {
535 _releaseFlags[bufferNo].Set();
536 }
537}
538
539inline void Component::_GetOutput( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus )
540{
541 auto& signal = *_outputBuses[bufferNo].GetSignal( fromOutput );
542
543 if ( !signal.has_value() )
544 {
545 return;
546 }
547
548 auto& ref = _refs[bufferNo][fromOutput];
549
550 if ( ref.total == 1 )
551 {
552 // there's only one reference, move the signal immediately
553 toBus.MoveSignal( toInput, signal );
554 }
555 else if ( ++ref.count != ref.total )
556 {
557 // this is not the final reference, copy the signal
558 toBus.SetSignal( toInput, signal );
559 }
560 else
561 {
562 // this is the final reference, reset the counter, move the signal
563 ref.count = 0;
564 toBus.MoveSignal( toInput, signal );
565 }
566}
567
568inline void Component::_GetOutputParallel( int bufferNo, int fromOutput, int toInput, DSPatch::SignalBus& toBus )
569{
570 auto& signal = *_outputBuses[bufferNo].GetSignal( fromOutput );
571 auto& ref = _refs[bufferNo][fromOutput];
572
573 // wait for this output to be ready
574 ref.readyFlag.WaitAndClear();
575
576 if ( !signal.has_value() )
577 {
578 return;
579 }
580
581 if ( ref.total == 1 )
582 {
583 // there's only one reference, move the signal immediately and return
584 toBus.MoveSignal( toInput, signal );
585 }
586 else if ( ++ref.count != ref.total )
587 {
588 // this is not the final reference, copy the signal
589 toBus.SetSignal( toInput, signal );
590
591 // wake next WaitAndClear()
592 ref.readyFlag.Set();
593 }
594 else
595 {
596 // this is the final reference, reset the counter, move the signal
597 ref.count = 0;
598 toBus.MoveSignal( toInput, signal );
599 }
600}
601
602inline void Component::_IncRefs( int output )
603{
604 for ( auto& ref : _refs )
605 {
606 ++ref[output].total;
607 }
608}
609
610inline void Component::_DecRefs( int output )
611{
612 for ( auto& ref : _refs )
613 {
614 --ref[output].total;
615 }
616}
617
618} // namespace DSPatch
Abstract base class for DSPatch components.
Definition Component.h:65
Signal container.
Definition SignalBus.h:53