The Scan
Operator in Rx.Net has signature:
public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator);
The accumulator is
Func<TAccumulate, TSource, TAccumulate> accumulator
While trying to implement a state machine model with async state transition, I found a ScanAsync
operator with following signature would be helpful.
public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator);
The accumulator has signature
Func<TAccumulate, TSource, Task<TAccumulate>> accumulator
Ideal application code would be something like this (similar to normal Scan
operator, with difference on using async accumulator).
IObservable<TEvent> events;
IObservable<State> states = events.ScanAsync(
initialState,
async (previousState, evt) => {
var newState = await transitionAsync(previousState, evt);
return newState;
});
It seems MS is developing AsyncRx.NET, however it is not released yet(no schedule).
Related things:
If modeling async state machine by BehaviourSubject
for states, and subscribe events observable, like the following code
IObservable<TEvent> events;
BehaviourSubject<State> states = new BehaviourSubject<State>(initialState);
events.Subscribe(async e => {
var newState = await transition(states.Value, e);
states.OnNext(newState);
})
I guess there might be race conditions in certain cases.
I've tried implementing it with
IObservable<TS> ScanAsync<TS, TE>(
IObservable<TE> source,
Func<TS, TE, Task<TS>> reducer,
TS initialState)
{
var states = from m in source.Take(1)
from nextState in reducer(initialState, m).ToObservable()
from s in ScanAsync(source.Skip(1), reducer, nextState)
select s;
return Observable.Return(initialState).Concat(states);
}
However it sometimes works, some times it just blocked, and I have no idea what's causing this.
You could use the Scan
operator to create an intermediate IObservable<Task<TAccumulate>>
, which can then be flattened by using the Concat
operator:
public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(
this IObservable<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
{
return source.Scan(Task.FromResult(seed), async (previousTask, item) =>
{
return await accumulator(await previousTask, item);
}).Concat();
}
The implementation above uses the Concat
overload that accepts an observable of tasks, instead of a nested observable:
// Concatenates all task results, as long as the previous task terminated successfully.
public static IObservable<TSource> Concat<TSource>(
this IObservable<Task<TSource>> sources);