Objective
The problem I am trying to solve is aggregating the sequence (summing values) of messages of type (int Key, int Value)
until a closing observable emits a "flush" marker item.
For example, given a sequence of (Key,Value)
items
(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)
When Flush
is triggered I expect an array with [(1,4), (2,3)]
to be emitted.
When the sequence is completed - an array with [(2,6)]
should be emitted.
What I've tried
I started with GroupBy
+ Aggregate
+ Buffer(flush)
as an intuitive way of achieving required behavior. However, the sequence does not emit intermediate results, but rather all the aggregates as the final output.
Here's the full code
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();
var completion = StartProcessing(source, flush);
source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));
flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected
source.OnNext((2, 1));
source.OnNext((2, 5));
source.OnCompleted(); // emit of [(2,6)] is expected
await completion;
return;
static Task StartProcessing(
IObservable<(int Key, int Value)> source,
IObservable<Unit> flush)
{
return source
.GroupBy
(
keySelector: message => message.Key
)
.SelectMany
(
group => group
.Aggregate
(
seed: (group.Key, Value: 0),
accumulator: (output, item) => (output.Key, output.Value + item.Value)
)
)
.Buffer(flush)
.Select(buffer => Observable.FromAsync(() => Flush(buffer)))
.Merge()
.ToTask();
}
static async Task Flush(IEnumerable<(int Key, int Value)> data)
{
Console.WriteLine($"Flushing [{string.Join(", ", data)}]");
// processing takes time
await Task.Delay(TimeSpan.FromSeconds(1));
}
The output is
Flushing []
Flushing [(1, 4), (2, 9)]
As far as I get it (not sure though), the Buffer
does not emit values unless sequence is completed because Aggregate
does not propagate intermediate values.
I also tried to use GroupByUntil
instead of GroupBy
and Buffer
. With that structure I get intermediate output on group closing, but each group's aggregate is emitted one by one and it is unclear how to bundle them so that they are not flushed separately.
Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)
So far I got stuck and looking for some help, specifically:
Update
This alternative solution seems to work
static Task StartProcessing(
IObservable<(int Key, int Value)> source,
IObservable<Unit> flush)
{
return source
.Window(flush)
.SelectMany(group => group
.Aggregate
(
seed: new Dictionary<int, int>(),
accumulator: (output, item) =>
{
output[item.Key] = output.TryGetValue(item.Key, out var value)
? value + item.Value
: item.Value;
return output;
}
)
.SelectMany(buffer => Observable
.FromAsync(() => Flush(buffer.Select(x => (x.Key, x.Value)))))
)
.ToTask();
}
Explanation
Window
to produce a sequence that can be completed on demand by triggering "flush" observable. This is like TakeUntil
, but it can produce multiple observables instead of just oneAggergate
to perform accumulation. Upon group completion triggered by "flush" this will emit a dictionary with accumulatesObservable.FromAsync
creates a deferred action that will start when there will be an observer, so it receives the buffer with accumulates and starts processing. FromAsync
returns IObservable<Unit>
and will emit value upon task completionSelectMany
is used to flatten a sequence of observables into a single observable sequence. It replaces Select
+ Merge
ToTask
returns a Task that will complete when last the Unit is received indicating all processing tasks are completedUpdated:
Rewrote your example. Looks like it suits your needs:
static async Task Main()
{
var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();
var completion = StartProcessing(source, flush);
source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));
flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected
source.OnNext((2, 1));
source.OnNext((2, 5));
source.OnCompleted(); // emit of [(2,6)] is expected
await completion;
return;
}
static Task StartProcessing(
IObservable<(int Key, int Value)> source,
IObservable<Unit> flush)
{
return source
.Buffer(flush)
.Select(list => list.GroupBy(i=>i.Key).Select(group => (group.Key, group.Sum(i=>i.Value))))
.Select(item => Observable.FromAsync(() => Flush(item)))
.Merge()
.ToTask();
}
static async Task Flush((int Key, int Value) data)
{
Console.WriteLine($"Flushing [{data}]");
// processing takes time
await Task.Delay(TimeSpan.FromSeconds(1));
}
Produces:
Flushing [(1, 4), (2, 3)]
Flushing [(2, 6)]