I am using the SelectMany
operator in order to project the elements of an observable sequence to tasks, and propagate the results of those tasks. Everything is OK if all operations succeed, but I don't like that in case of an exception all currently running operations are becoming fire-and-forget. I would prefer to wait until all pending operations have completed, and only then be notified for the error (or errors) that have occurred. Here is a minimal example of the behavior that I would like to avoid:
try
{
await Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => (int)x + 1)
.Take(5)
.SelectMany(x => Observable.FromAsync(async ct =>
{
await Task.Delay(500); // Simulate an I/O operation
if (x == 3) throw new ApplicationException("Oops!");
Console.WriteLine($"Operation #{x} completed");
return x;
}))
.Do(x => Console.WriteLine($"Result: {x}"));
}
catch (Exception ex)
{
Console.WriteLine($"Failed, {ex.GetType().Name}: {ex.Message}");
}
await Task.Delay(1000);
Output (undesirable):
Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Failed, ApplicationException: Oops!
Operation #4 completed
Operation #5 completed
The desirable output should look like this:
Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Operation #4 completed
Result: 4
Operation #5 completed
Result: 5
Failed, AggregateException: One or more errors occurred. (Oops!)
The sequence should propagate the exception of the item #3 only after all other running operations (#4 and #5) have completed.
In the above example I have intentionally omitted to use the CancellationToken
argument of the Observable.FromAsync
method, because I want to simulate a situation where the launched asynchronous operations are not cancelable, or their cancellation is not instantaneous.
I am thinking about implementing a custom operator SelectManyUntilCompletion
with the signature below:
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, IObservable<TResult>> selector);
The signature is almost identical with the SelectMany
. The only difference is that the selector
delegate has a CancellationToken
parameter. This token should be canceled when an error has occurred in any other subsequence. The idea is that instead of unsubscribing abruptly from the subsequences, the operator should instead communicate the cancellation, but remain subscribed to the subsequences until their natural completion.
My question is: how can I implement this operator? Is it possible to implement it based on the existing SelectMany
, or should I go at a lower level, using the Observable.Create
?
Below is the detailed description of the operator. In case of success its behavior should be identical with the SelectMany
, so I am only describing its behavior in case of errors.
source
sequence should be unsubscribed immediately after an error has occurred, so than no more subsequences are created.CancellationToken
should be signaled immediately after an error has occurred.TResult
values that have been produced by all subsequences before and after the error.AggregateException
. This includes a possible error of the source
sequence.AggregateException
should not include any OperationCanceledException
s that may have occurred as a result of canceling the CancellationToken
.Marble diagram:
Source: +-----A-----B-------------C---------------D----|
Subsequence-A: +-------------a---------|
Subsequence-B: +---b---------------------X
Subsequence-C: +-------c----------------c----|
Subsequence-D:
Result: +---------------b---a-------------c----------------c----X
The subsequence D was not subscribed because it was emitted after the failure of the subsequence B.
The marble diagram indicates that the subsequence C did not respond promptly to the cancellation signal, which is a valid scenario.
Here is one solution to this problem. The implementation below is based on the SelectMany
operator. All the involved observable sequences have their errors suppressed with a Catch
+Empty
combo. The errors are aggregated in a ConcurrentQueue<Exception>
, and are thrown from a final Concat
+Defer
combo.
/// <summary>
/// Projects each element of the source observable sequence to a subsequence,
/// and merges the resulting subsequences into one observable sequence.
/// The merged sequence completes when all the projected subsequences complete
/// on their own. Unlike the SelectMany operator, the subsequences are not
/// unsubscribed when an error occurs.
/// </summary>
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, IObservable<TResult>> selector)
{
return Observable.Defer(() =>
{
var cts = new CancellationTokenSource();
var errors = new ConcurrentQueue<Exception>();
var stopSignal = new Subject<Unit>();
var stopSignalSynchronized = Observer.Synchronize(stopSignal);
IObservable<T> HandleErrorReturnEmpty<T>(Exception ex)
{
cts.Cancel();
bool ignoreError = ex is OperationCanceledException
&& cts.IsCancellationRequested;
if (!ignoreError) errors.Enqueue(ex);
stopSignalSynchronized.OnNext(default);
return Observable.Empty<T>();
}
return source
.TakeUntil(stopSignal)
.Catch((Exception ex) => HandleErrorReturnEmpty<TSource>(ex))
.SelectMany(item =>
{
if (!errors.IsEmpty) return Observable.Empty<TResult>();
IObservable<TResult> projected;
try { projected = selector(item, cts.Token); }
catch (Exception ex) { return HandleErrorReturnEmpty<TResult>(ex); }
return projected
.Catch((Exception ex) => HandleErrorReturnEmpty<TResult>(ex));
})
.Concat(Observable.Defer(() =>
{
cts.Dispose();
if (!errors.IsEmpty) throw new AggregateException(errors);
return Observable.Empty<TResult>();
}));
});
}
In case of an error, a stopping signal is propagated through a synchronized Subject<Unit>
, and observed by a TakeUntil
operator which is chained to the source
.
Usage example:
//...
.SelectManyUntilCompletion((item, token) => Observable.FromAsync(async () =>
{
//...
}))
//...