Search code examples
c#asynchronoussystem.reactivestate-machinerx.net

How to implement a ScanAsync operator with async accumulator in Rx.Net?


The Scan Operator in Rx.Net has signature:

public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator);

The accumulator is

Func<TAccumulate, TSource, TAccumulate> accumulator

While trying to implement a state machine model with async state transition, I found a ScanAsync operator with following signature would be helpful.

public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator);

The accumulator has signature

Func<TAccumulate, TSource, Task<TAccumulate>> accumulator

Ideal application code would be something like this (similar to normal Scan operator, with difference on using async accumulator).

IObservable<TEvent> events;
IObservable<State> states = events.ScanAsync(
    initialState, 
    async (previousState, evt) => {
        var newState = await transitionAsync(previousState, evt);
        return newState;
    });

It seems MS is developing AsyncRx.NET, however it is not released yet(no schedule).


Related things:

If modeling async state machine by BehaviourSubject for states, and subscribe events observable, like the following code

IObservable<TEvent> events;
BehaviourSubject<State> states = new BehaviourSubject<State>(initialState);
events.Subscribe(async e => {
    var newState = await transition(states.Value, e);
    states.OnNext(newState);
})

I guess there might be race conditions in certain cases.

I've tried implementing it with

IObservable<TS> ScanAsync<TS, TE>(
IObservable<TE> source,
Func<TS, TE, Task<TS>> reducer,
TS initialState)
{
    var states = from m in source.Take(1)
                    from nextState in reducer(initialState, m).ToObservable()
                    from s in ScanAsync(source.Skip(1), reducer, nextState)
                    select s;
    return Observable.Return(initialState).Concat(states);
}

However it sometimes works, some times it just blocked, and I have no idea what's causing this.


Solution

  • You could use the Scan operator to create an intermediate IObservable<Task<TAccumulate>>, which can then be flattened by using the Concat operator:

    public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(
        this IObservable<TSource> source,
        TAccumulate seed,
        Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
    {
        return source.Scan(Task.FromResult(seed), async (previousTask, item) =>
        {
            return await accumulator(await previousTask, item);
        }).Concat();
    }
    

    The implementation above uses the Concat overload that accepts an observable of tasks, instead of a nested observable:

    // Concatenates all task results, as long as the previous task terminated successfully.
    public static IObservable<TSource> Concat<TSource>(
        this IObservable<Task<TSource>> sources);