Search code examples
c#asynchronoussystem.reactiverx.net

How to implement a custom SelectMany operator that waits for all observable subsequences to complete?


I am using the SelectMany operator in order to project the elements of an observable sequence to tasks, and propagate the results of those tasks. Everything is OK if all operations succeed, but I don't like that in case of an exception all currently running operations are becoming fire-and-forget. I would prefer to wait until all pending operations have completed, and only then be notified for the error (or errors) that have occurred. Here is a minimal example of the behavior that I would like to avoid:

try
{
    await Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Select(x => (int)x + 1)
        .Take(5)
        .SelectMany(x => Observable.FromAsync(async ct =>
        {
            await Task.Delay(500); // Simulate an I/O operation
            if (x == 3) throw new ApplicationException("Oops!");
            Console.WriteLine($"Operation #{x} completed");
            return x;
        }))
        .Do(x => Console.WriteLine($"Result: {x}"));
}
catch (Exception ex)
{
    Console.WriteLine($"Failed, {ex.GetType().Name}: {ex.Message}");
}
await Task.Delay(1000);

Output (undesirable):

Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Failed, ApplicationException: Oops!
Operation #4 completed
Operation #5 completed

Try it on Fiddle:

The desirable output should look like this:

Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Operation #4 completed
Result: 4
Operation #5 completed
Result: 5
Failed, AggregateException: One or more errors occurred. (Oops!)

The sequence should propagate the exception of the item #3 only after all other running operations (#4 and #5) have completed.

In the above example I have intentionally omitted to use the CancellationToken argument of the Observable.FromAsync method, because I want to simulate a situation where the launched asynchronous operations are not cancelable, or their cancellation is not instantaneous.

I am thinking about implementing a custom operator SelectManyUntilCompletion with the signature below:

public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, IObservable<TResult>> selector);

The signature is almost identical with the SelectMany. The only difference is that the selector delegate has a CancellationToken parameter. This token should be canceled when an error has occurred in any other subsequence. The idea is that instead of unsubscribing abruptly from the subsequences, the operator should instead communicate the cancellation, but remain subscribed to the subsequences until their natural completion.

My question is: how can I implement this operator? Is it possible to implement it based on the existing SelectMany, or should I go at a lower level, using the Observable.Create?

Below is the detailed description of the operator. In case of success its behavior should be identical with the SelectMany, so I am only describing its behavior in case of errors.

  1. The resulting sequence should complete when all subsequences have completed.
  2. The source sequence should be unsubscribed immediately after an error has occurred, so than no more subsequences are created.
  3. The CancellationToken should be signaled immediately after an error has occurred.
  4. The resulting sequence should propagate all the TResult values that have been produced by all subsequences before and after the error.
  5. The resulting sequence should finally propagate all the errors that have occurred, bundled in an AggregateException. This includes a possible error of the source sequence.
  6. The AggregateException should not include any OperationCanceledExceptions that may have occurred as a result of canceling the CancellationToken.

Marble diagram:

Source:        +-----A-----B-------------C---------------D----|
Subsequence-A:       +-------------a---------|
Subsequence-B:             +---b---------------------X
Subsequence-C:                           +-------c----------------c----|
Subsequence-D:
Result:        +---------------b---a-------------c----------------c----X

The subsequence D was not subscribed because it was emitted after the failure of the subsequence B.

The marble diagram indicates that the subsequence C did not respond promptly to the cancellation signal, which is a valid scenario.


Solution

  • Here is one solution to this problem. The implementation below is based on the SelectMany operator. All the involved observable sequences have their errors suppressed with a Catch+Empty combo. The errors are aggregated in a ConcurrentQueue<Exception>, and are thrown from a final Concat+Defer combo.

    /// <summary>
    /// Projects each element of the source observable sequence to a subsequence,
    /// and merges the resulting subsequences into one observable sequence.
    /// The merged sequence completes when all the projected subsequences complete
    /// on their own. Unlike the SelectMany operator, the subsequences are not
    /// unsubscribed when an error occurs.
    /// </summary>
    public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
        this IObservable<TSource> source,
        Func<TSource, CancellationToken, IObservable<TResult>> selector)
    {
        return Observable.Defer(() =>
        {
            var cts = new CancellationTokenSource();
            var errors = new ConcurrentQueue<Exception>();
            var stopSignal = new Subject<Unit>();
            var stopSignalSynchronized = Observer.Synchronize(stopSignal);
                
            IObservable<T> HandleErrorReturnEmpty<T>(Exception ex)
            {
                cts.Cancel();
                bool ignoreError = ex is OperationCanceledException
                    && cts.IsCancellationRequested;
                if (!ignoreError) errors.Enqueue(ex);
                stopSignalSynchronized.OnNext(default);
                return Observable.Empty<T>();
            }
    
            return source
                .TakeUntil(stopSignal)
                .Catch((Exception ex) => HandleErrorReturnEmpty<TSource>(ex))
                .SelectMany(item =>
                {
                    if (!errors.IsEmpty) return Observable.Empty<TResult>();
                    IObservable<TResult> projected;
                    try { projected = selector(item, cts.Token); }
                    catch (Exception ex) { return HandleErrorReturnEmpty<TResult>(ex); }
                    return projected
                        .Catch((Exception ex) => HandleErrorReturnEmpty<TResult>(ex));
                })
                .Concat(Observable.Defer(() =>
                {
                    cts.Dispose();
                    if (!errors.IsEmpty) throw new AggregateException(errors);
                    return Observable.Empty<TResult>();
                }));
        });
    }
    

    In case of an error, a stopping signal is propagated through a synchronized Subject<Unit>, and observed by a TakeUntil operator which is chained to the source.

    Usage example:

    //...
    .SelectManyUntilCompletion((item, token) => Observable.FromAsync(async () =>
    {
        //...
    }))
    //...