Search code examples
c#.netsystem.reactiverx.net

How can I implement an exhaustMap handler in Rx.Net?


I am looking for something similar to the exhaustMap operator from rxjs, but RX.NET does not seem to have such an operator.

What I need to achieve is that, upon every element of the source stream, I need to start an async handler, and until it finishes, I would like to drop any elements from the source. As soon as the handler finishes, resume taking elements.

What I don't want is to start an async handler upon every element - while the handler runs, I want to drop source elements.

I also suspect I need to cleverly use the defer operator here?

Thank you!


Solution

  • Here is an implementation of the ExhaustMap operator. The source observable is projected to an IObservable<(Task<TResult>, int)>, where each subsequent task is either the previous one if it's still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with the DistinctUntilChanged operator, and finally the observable is flattened with the Concat operator.

    /// <summary>Invokes an asynchronous function for each element of an observable
    /// sequence, ignoring elements that are emitted before the completion of an
    /// asynchronous function of a preceding element.</summary>
    public static IObservable<TResult> ExhaustMap<TSource, TResult>(
        this IObservable<TSource> source,
        Func<TSource, Task<TResult>> function)
    {
        return source
            .Scan((Task: Task.FromResult<TResult>(default), Id: 0), (previous, item) =>
                !previous.Task.IsCompleted ? previous : (function(item), unchecked(previous.Id + 1)))
            .DistinctUntilChanged()
            .Select(e => e.Task)
            .Concat();
    }
    

    The tasks returned by the function are not guaranteed to be distinct. For example the method async Task<T> Return<T>(T result) => result; returns always the same Task for result = 1 or result = false. Hence the need for the incremented Id in the above implementation, that individualizes the tasks, so that the DistinctUntilChanged doesn't filter out tasks from separate function invocations.

    Usage example:

    Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(x => (int)x + 1)
        .Take(10)
        .Do(x => Console.WriteLine($"Produced #{x}"))
        .ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
        .Do(x => Console.WriteLine($"--Result: {x}"))
        .Wait();
    

    Output:

    Produced #1
    --Result: 1
    Produced #2
    --Result: 2
    Produced #3
    Produced #4
    Produced #5
    --Result: 3
    Produced #6
    Produced #7
    Produced #8
    --Result: 6
    Produced #9
    Produced #10
    

    Online demo.


    Here is an alternative implementation of the ExhaustMap, where the function produces an IObservable<TResult> instead of a Task<TResult>:

    /// <summary>Projects each element to an observable sequence, which is merged
    /// in the output observable sequence only if the previous projected observable
    /// sequence has completed.</summary>
    public static IObservable<TResult> ExhaustMap<TSource, TResult>(
        this IObservable<TSource> source,
        Func<TSource, IObservable<TResult>> function)
    {
        return Observable.Defer(() =>
        {
            int mutex = 0; // 0: not acquired, 1: acquired
            return source.SelectMany(item =>
            {
                // Attempt to acquire the mutex immediately. If successful, return
                // a sequence that releases the mutex when terminated. Otherwise,
                // return immediately an empty sequence.
                if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
                    return function(item).Finally(() => Volatile.Write(ref mutex, 0));
                return Observable.Empty<TResult>();
            });
        });
    }