Search code examples
c#system.reactiverx.net

How to merge two observables with early completion


The behavior of the built-in Merge operator is to complete when both sources are completed. I am searching for a variant of this operator that produces an observable that completes when any of the two source observables completes. For example if the first observable complete successfully and later the second observable complete with an exception, I want this exception to be ignored.

I came up with an implementation that concatenates a special sentinel exception to both enumerables, and then the merged sequence catches and suppresses this exception. I wonder if I am missing a simpler solution.

/// <summary>
/// Merges elements from two observable sequences into a single observable sequence,
/// that completes as soon as any of the source observable sequences completes.
/// </summary>
public static IObservable<T> MergeUntilAnyCompletes<T>(this IObservable<T> first,
    IObservable<T> second)
{
    var sentinel = new Exception();
    first = first.Concat(Observable.Throw<T>(sentinel));
    second = second.Concat(Observable.Throw<T>(sentinel));
    // Concat: Concatenates the second observable sequence to the first
    // observable sequence upon successful termination of the first.
    return first.Merge(second)
        .Catch(handler: (Exception ex) =>
        ex == sentinel ? Observable.Empty<T>() : Observable.Throw<T>(ex));
    // Catch: Continues an observable sequence that is terminated by an exception
    // of the specified type with the observable sequence produced by the handler.
}

Solution

  • Fun hack:

    public static IObservable<T> MergeUntilAnyCompletes<T>(this IObservable<T> first,
        IObservable<T> second)
    {
        return Observable.Merge(
            first.Materialize(),
            second.Materialize()
        ).Dematerialize();
    }
    

    Materialize turns the observable into an observable of notifications, so Merge will no longer suppress the OnCompleted notification. When you Dematerialize, then that operator will see the OnCompleted and stops.

    Side note: If you want some fun, slightly academic reading about Materialize/Dematerialize read this blog post. He's writing about Ix, but the same thing applies to Rx.