Let's say i have multiple inputs (Input1, Input2 etc) of data types that can arrive at different times. If data for input 1 arrives, then i require Input2 to arrive within a certain (configurable) period. If the input arrives, then the data flow should output the received inputs. If the timeout occurs, then just the result of the input that was received should be outputted and the state reset for the next input(s) to be received. What would an appropriate TPL dataflow be for this scenario ? I think a BatchJoinBlock can be used but what about the timeout behaviour ?
This is not a 100% implementation of the algorithm you wrote, but it should help you to find your way:
using System.Reactive.Linq;
using System.Reactive.Subjects;
// I'm using Subjects because they are easy.
// There are many more ways to create IObservables.
var input1 = new Subject<string?>();
var input2 = new Subject<int?>();
var input3 = new Subject<bool?>();
// This is the code that handles the timeout after 1 second
// It creates a "Timer" that is an IObservable that automatically emits after a delay.
Observable
.Timer(TimeSpan.FromSeconds(1))
.Subscribe(_ =>
{
// Emit nulls on all inputs after 1 seconds.
// This will make the CombineLatest below observable emit a value.
input1.OnNext(null);
input2.OnNext(null);
input3.OnNext(null);
});
// Create a combined observable that when all 3 inputs have emitted a value at least once,
// takes the first emitted values of every one, and combines them into a tuple.
Observable
.CombineLatest(
input1.FirstOrDefaultAsync(),
input2.FirstOrDefaultAsync(),
input3.FirstOrDefaultAsync(),
(a, b, c) => (a, b, c))
.Subscribe(x => Console.WriteLine($"{x}"));
// Now we start emitting values on the inputs somehow
await Task.Run(() =>
{
// Data for input 1 arrives,
input1.OnNext("a");
// Data for input 2 arrives,
//input2.OnNext(3);
// Data for input 3 arrives,
input3.OnNext(true);
});
// A moment to see the output
await Task.Delay(2000);
This code ends up showing this in the console after the timeout ends:
(a, , True)