I want to write an application that evaluates sensor data from two sensors. Both sensors send their data in Package
objects which are split into Frame
objects. A Package
is essentially a Tuple<Timestamp, Data[]>
, a Frame
is a Tuple<Timestamp, Data>
. Then I need to consume always the Frame
with the earliest timestamp from both sources.
So basically my object stream is
Package -(1:n)-> Frame \
}-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /
Assume each Package
contains either 2 or 3 values (reality: 5-7) and integer timestamps that increment by 1 (reality: ~200Hz => ~5ms increment). The "data" is just timestamp * 100
for sake of simplicity.
Packages (timestamp, values[])
Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
(29, [2700, 2800, 2900]), ...}
Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
(26, [2400, 2500, 2600]), ...}
After (1:n)
steps:
Frames (timestamp, value)
Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
(22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
(29, 2900), ...}
Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}
After the pair synchronized
step:
Merged tuples (timestamp, source1, source2)
{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
(19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
(24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}
Note that timestamp 23
is missing since none of both sources sent a value. That's just a side effect. I can put an empty tuple in or not, doesn't matter. It also doesn't matter if the tuple is (27, 2700, 2700)
or ((27, 2700), (27, 2700))
, i. e. Tuple<Timestamp, Data, Data>
or Tuple<Frame, Frame>
.
I'm pretty sure the (1:n)
part should be a TransformManyBlock<Package, Frame>
if I got the documentation right.
But which block do I use for the pair synchronized
part? At first, I thought the JoinBlock<Frame, Frame>
would be what I was looking for, but it appears it just pairs two elements index-wise. But since it is neither ensured that both pipelines start with the same timestamp nor that both pipelines will always produce a steady stream of continuous timestamps (because sometimes packages with a few frames may be lost in transmission), this is not an option. So what I need is more of a "MergeBlock" with a possibility to decide which element of both input streams to propagate to the output next (if any).
I figured I'd have to write something like this myself. But I'm having trouble to write the code that properly handles two ISourceBlock variables and one ITargetBlock variable. I'm basically stuck as early as can be:
private void MergeSynchronized(
ISourceBlock<Frame> source1,
ISourceBlock<Frame> source2,
ITargetBlock<Tuple<Frame, Frame>> target)
{
var frame1 = source1.Receive();
var frame2 = source2.Receive();
//Loop {
// Depending on the timestamp [mis]match,
// either pair frame1+frame2 or frame1+null or null+frame2, and
// replace whichever frame(s) was/were propagated already
// with the next frame from the respective pipeline
//}
}
I'm not even sure about this draft: Should the method be async
so I can use var frame1 = await source1.ReceiveAsnyc();
? What is the loop's condition? Where and how to check for completion? How to solve the obvious problem that my code means I have to wait until a gap in a stream is over to realize that there was a gap?
The alternative I thought about is to add an additional block in the pipelines, ensuring that enough "sentinel frames" are put into the pipeline per sensor so that aligning always the first from each pipeline will align the correct two. I guess that would be a kind-of TransformManyBlock
which reads a Frame, compares the "expected" timestamp with the actual timestamp, and then inserts sentinel frames for the missing timestamps until the frame's timestamp is correct again.
Or is the pair synchronized
part the place to stop with TPL Dataflow objects and start the actual code that already works with the Data
part?
Here is an implementation of a SynchronizedJoinBlock
block, similar with the one presented in Hardy Hobeck's answer. This one takes care of some minor details, like cancellation, handling exceptions, and dealing with propagating the remaining items when the input blocks Target1
and Target2
are marked as completed. Also the merging logic does not involve recursion, which should make it perform better (hopefully, I didn't measure it) and not be susceptible to stack overflow exceptions. Small deviation: the output is a ValueTuple<T1, T2>
instead of Tuple<T1, T2>
(with the intention of reducing allocations).
public sealed class SynchronizedJoinBlock<T1, T2> : IReceivableSourceBlock<(T1, T2)>
{
private readonly Func<T1, T2, int> _comparison;
private readonly Queue<T1> _queue1 = new Queue<T1>();
private readonly Queue<T2> _queue2 = new Queue<T2>();
private readonly ActionBlock<T1> _input1;
private readonly ActionBlock<T2> _input2;
private readonly BufferBlock<(T1, T2)> _output;
private readonly object _locker = new object();
public SynchronizedJoinBlock(Func<T1, T2, int> comparison,
CancellationToken cancellationToken = default)
{
_comparison = comparison ?? throw new ArgumentNullException(nameof(comparison));
// Create the three internal blocks
var options = new ExecutionDataflowBlockOptions()
{
CancellationToken = cancellationToken
};
_input1 = new ActionBlock<T1>(Add1, options);
_input2 = new ActionBlock<T2>(Add2, options);
_output = new BufferBlock<(T1, T2)>(options);
// Link the input blocks with the output block
var inputTasks = new Task[] { _input1.Completion, _input2.Completion };
Task.WhenAny(inputTasks).Unwrap().ContinueWith(t =>
{
// If ANY input block fails, then the whole block has failed
((IDataflowBlock)_output).Fault(t.Exception.InnerException);
if (!_input1.Completion.IsCompleted) _input1.Complete();
if (!_input2.Completion.IsCompleted) _input2.Complete();
ClearQueues();
}, default, TaskContinuationOptions.OnlyOnFaulted |
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
Task.WhenAll(inputTasks).ContinueWith(t =>
{
// If ALL input blocks succeeded, then the whole block has succeeded
try
{
if (!t.IsCanceled) PostRemaining(); // Post what's left
}
catch (Exception ex)
{
((IDataflowBlock)_output).Fault(ex);
}
_output.Complete();
ClearQueues();
}, default, TaskContinuationOptions.NotOnFaulted |
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
}
public ITargetBlock<T1> Target1 => _input1;
public ITargetBlock<T2> Target2 => _input2;
public Task Completion => _output.Completion;
private void Add1(T1 value1)
{
lock (_locker)
{
_queue1.Enqueue(value1);
FindAndPostMatched_Unsafe();
}
}
private void Add2(T2 value2)
{
lock (_locker)
{
_queue2.Enqueue(value2);
FindAndPostMatched_Unsafe();
}
}
private void FindAndPostMatched_Unsafe()
{
while (_queue1.Count > 0 && _queue2.Count > 0)
{
var result = _comparison(_queue1.Peek(), _queue2.Peek());
if (result < 0)
{
_output.Post((_queue1.Dequeue(), default));
}
else if (result > 0)
{
_output.Post((default, _queue2.Dequeue()));
}
else // result == 0
{
_output.Post((_queue1.Dequeue(), _queue2.Dequeue()));
}
}
}
private void PostRemaining()
{
lock (_locker)
{
while (_queue1.Count > 0)
{
_output.Post((_queue1.Dequeue(), default));
}
while (_queue2.Count > 0)
{
_output.Post((default, _queue2.Dequeue()));
}
}
}
private void ClearQueues()
{
lock (_locker)
{
_queue1.Clear();
_queue2.Clear();
}
}
public void Complete() => _output.Complete();
public void Fault(Exception exception)
=> ((IDataflowBlock)_output).Fault(exception);
public IDisposable LinkTo(ITargetBlock<(T1, T2)> target,
DataflowLinkOptions linkOptions)
=> _output.LinkTo(target, linkOptions);
public bool TryReceive(Predicate<(T1, T2)> filter, out (T1, T2) item)
=> _output.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<(T1, T2)> items)
=> _output.TryReceiveAll(out items);
(T1, T2) ISourceBlock<(T1, T2)>.ConsumeMessage(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target,
out bool messageConsumed)
=> ((ISourceBlock<(T1, T2)>)_output).ConsumeMessage(
messageHeader, target, out messageConsumed);
void ISourceBlock<(T1, T2)>.ReleaseReservation(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
=> ((ISourceBlock<(T1, T2)>)_output).ReleaseReservation(
messageHeader, target);
bool ISourceBlock<(T1, T2)>.ReserveMessage(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
=> ((ISourceBlock<(T1, T2)>)_output).ReserveMessage(
messageHeader, target);
}
Usage example:
var joinBlock = new SynchronizedJoinBlock<(int, int), (int, int)>(
(x, y) => Comparer<int>.Default.Compare(x.Item1, y.Item1));
var source1 = new (int, int)[] {(17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600),
(27, 2700), (28, 2800), (29, 2900)};
var source2 = new (int, int)[] {(15, 1500), (16, 1600), (17, 1700),
(18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400),
(25, 2500), (26, 2600)};
Array.ForEach(source1, x => joinBlock.Target1.Post(x));
Array.ForEach(source2, x => joinBlock.Target2.Post(x));
joinBlock.Target1.Complete();
joinBlock.Target2.Complete();
while (joinBlock.OutputAvailableAsync().Result)
{
Console.WriteLine($"> Received: {joinBlock.Receive()}");
}
Output:
Received: ((0, 0), (15, 1500))
Received: ((0, 0), (16, 1600))
Received: ((17, 1700), (17, 1700))
Received: ((18, 1800), (18, 1800))
Received: ((19, 1900), (19, 1900))
Received: ((20, 2000), (20, 2000))
Received: ((21, 2100), (21, 2100))
Received: ((22, 2200), (0, 0))
Received: ((0, 0), (24, 2400))
Received: ((25, 2500), (25, 2500))
Received: ((26, 2600), (26, 2600))
Received: ((27, 2700), (0, 0))
Received: ((28, 2800), (0, 0))
Received: ((29, 2900), (0, 0))
It is assumed that the incoming data are ordered.
This class shares a similar structure with the JoinDependencyBlock
class I posted some time ago in a somewhat related question.