Search code examples
c#system.reactive.net-8.0rx.net

Rx.Net - How to Aggregate messages and emit intermediate output when closing sequence gets triggered?


Objective

The problem I am trying to solve is aggregating the sequence (summing values) of messages of type (int Key, int Value) until a closing observable emits a "flush" marker item.

For example, given a sequence of (Key,Value) items

(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)

When Flush is triggered I expect an array with [(1,4), (2,3)] to be emitted.

When the sequence is completed - an array with [(2,6)] should be emitted.

What I've tried

I started with GroupBy + Aggregate + Buffer(flush) as an intuitive way of achieving required behavior. However, the sequence does not emit intermediate results, but rather all the aggregates as the final output.

Here's the full code

using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();

var completion = StartProcessing(source, flush);

source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));

flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected

source.OnNext((2, 1));
source.OnNext((2, 5));

source.OnCompleted(); // emit of [(2,6)] is expected

await completion;

return;

static Task StartProcessing(
    IObservable<(int Key, int Value)> source,
    IObservable<Unit> flush)
{
    return source
        .GroupBy
        (
            keySelector: message => message.Key
        )
        .SelectMany
        (
            group => group
                .Aggregate
                (
                    seed: (group.Key, Value: 0),
                    accumulator: (output, item) => (output.Key, output.Value + item.Value)
                )
        )
        .Buffer(flush)
        .Select(buffer => Observable.FromAsync(() => Flush(buffer)))
        .Merge()
        .ToTask();
}

static async Task Flush(IEnumerable<(int Key, int Value)> data)
{
    Console.WriteLine($"Flushing [{string.Join(", ", data)}]");

    // processing takes time
    await Task.Delay(TimeSpan.FromSeconds(1));
}

The output is

Flushing []
Flushing [(1, 4), (2, 9)]

As far as I get it (not sure though), the Buffer does not emit values unless sequence is completed because Aggregate does not propagate intermediate values.

I also tried to use GroupByUntil instead of GroupBy and Buffer. With that structure I get intermediate output on group closing, but each group's aggregate is emitted one by one and it is unclear how to bundle them so that they are not flushed separately.

Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)

So far I got stuck and looking for some help, specifically:

  • what is considered the correct - in terms of RX - way of implementing the required behavior?
  • whether it can be achieved using built-in operators or should I create custom ones (e.g. a custom AggregateUntil)?

Update

This alternative solution seems to work

static Task StartProcessing(
    IObservable<(int Key, int Value)> source,
    IObservable<Unit> flush)
{
    return source
        .Window(flush)
        .SelectMany(group => group
            .Aggregate
            (
                seed: new Dictionary<int, int>(),
                accumulator: (output, item) =>
                {
                    output[item.Key] = output.TryGetValue(item.Key, out var value)
                        ? value + item.Value
                        : item.Value;

                    return output;
                }
            )
            .SelectMany(buffer => Observable
                .FromAsync(() => Flush(buffer.Select(x => (x.Key, x.Value)))))
        )
        .ToTask();
}

Explanation

  • Window to produce a sequence that can be completed on demand by triggering "flush" observable. This is like TakeUntil, but it can produce multiple observables instead of just one
  • Aggergate to perform accumulation. Upon group completion triggered by "flush" this will emit a dictionary with accumulates
  • Observable.FromAsync creates a deferred action that will start when there will be an observer, so it receives the buffer with accumulates and starts processing. FromAsync returns IObservable<Unit> and will emit value upon task completion
  • SelectMany is used to flatten a sequence of observables into a single observable sequence. It replaces Select + Merge
  • Finally ToTask returns a Task that will complete when last the Unit is received indicating all processing tasks are completed

Solution

  • Updated:

    Rewrote your example. Looks like it suits your needs:

        static async Task Main()
        {
            var source = new Subject<(int Key, int Value)>();
            var flush = new Subject<Unit>();
    
            var completion = StartProcessing(source, flush);
    
            source.OnNext((1, 1));
            source.OnNext((2, 3));
            source.OnNext((1, 3));
    
            flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected
    
            source.OnNext((2, 1));
            source.OnNext((2, 5));
    
            source.OnCompleted(); // emit of [(2,6)] is expected
    
            await completion;
    
            return;
        }
    
        static Task StartProcessing(
            IObservable<(int Key, int Value)> source,
            IObservable<Unit> flush)
        {
            return source
                .Buffer(flush) 
                .Select(list => list.GroupBy(i=>i.Key).Select(group => (group.Key, group.Sum(i=>i.Value))))
                .Select(item => Observable.FromAsync(() => Flush(item)))
                .Merge()
                .ToTask();
        }
    
    
        static async Task Flush((int Key, int Value) data)
        {
            Console.WriteLine($"Flushing [{data}]");
    
            // processing takes time
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    

    Produces:

    Flushing [(1, 4), (2, 3)]
    Flushing [(2, 6)]