Search code examples
c#concurrencysystem.reactiverx.net

How to merge multiple observables with order preservation and maximum concurrency?


I have a nested observable IObservable<IObservable<T>>, and I want to flatten it to a IObservable<T>. I don't want to use the Concat operator because it delays the subscription to each inner observable until the completion of the previous observable. This is a problem because the inner observables are cold, and I want them to start emitting T values immediately after they are emitted by the outer observable. I also don't want to use the Merge operator because it messes the order of the emitted values. The marble diagram below shows the problematic (for my case) behavior of the Merge operator, as well as the Desirable merging behavior.

Stream of observables: +----1------2-----3----|
Observable-1         :      +--A-----------------B-------|
Observable-2         :             +---C---------------------D------|
Observable-3         :                   +--E--------------------F-------|
Merge (undesirable)  : +-------A-------C----E----B-----------D---F-------|
Desirable merging    : +-------A-----------------B-------C---D------EF---|

All values emitted by the Observable-1 should precede any value emitted by the Observable-2. The same should be true with the Observable-2 and Observable-3, and so on.

What I like with the Merge operator is that it allows to configure the maximum concurrent subscriptions to inner observables. I would like to preserve this functionality with the custom MergeOrdered operator I am trying to implement. Here is my under-construction method:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Merge(maximumConcurrency); // How to make it ordered?
}

And here is a usage example:

var source = Observable
    .Interval(TimeSpan.FromMilliseconds(300))
    .Take(4)
    .Select(x => Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(y => $"{x + 1}-{(char)(65 + y)}")
        .Take(3));

var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

Output (undesirable):

Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C

The desirable output is:

Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C

Clarification: Regarding the ordering of the values, the values themselves are irrelevant. What matters is the order of their originated inner sequence, and their position in that sequence. All values from the first inner sequence should be emitted first (in their original order), then all the values from the second inner sequence, then all the values from the third, etc.


Solution

  • I figured out a solution to this problem, by using a combination of the Merge, Merge(1)¹ and Replay operators. The Merge operator enforces the concurrency policy, and the Merge(1) operator enforces the ordered sequential emission. To prevent the Merge from messing up the order of the emitted values, an extra wrapping of the inner sequences is introduced. Each inner sequence is projected to an IObservable<IObservable<T>> that emits immediately the inner sequence, and later completes when the inner sequence completes. This wrapping is implemented using the Observable.Create method:

    public static IObservable<T> MergeOrdered<T>(
        this IObservable<IObservable<T>> source,
        int maximumConcurrency = Int32.MaxValue)
    {
        return source.Select(inner => inner.Replay(buffered => Observable
            .Create<IObservable<T>>(observer =>
        {
            observer.OnNext(buffered);
            return buffered.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
        })))
        .Merge(maximumConcurrency)
        .Merge(1);
    }
    

    The Replay operator buffers all the messages that are emitted by the inner sequences, so that they are not lost in the meantime between the subscription by the Merge, and the subscription by the Merge(1).

    The funny thing is that, because of the wrapping, an intermediate IObservable<IObservable<IObservable<T>>> sequence is created. This scary thing is then unwrapped two times, first by the Merge and secondly by the Merge(1) operators.

    This is not a perfectly efficient solution, because there is no reason for the inner sequence currently subscribed by the Merge(1) to be buffered. Optimizing this inefficiency in not trivial though, so I'll leave it as is. In scenarios where each subsequence contains a small number of elements, the impact of this flaw should be negligible. Attempting to fix it could even result at doing more harm than good in these scenarios.

    ¹ Ideally I would like to use the Concat instead of the equivalent but less efficient Merge(1) operator. Unfortunately the Concat operator behaves weirdly in the current version of the Rx library (5.0.0). I even encountered a deadlocking behavior when using the Concat in a fairly complex query, that was solved by switching to the Merge(1) operator.


    Note: The original implementation of this answer, featuring a SemaphoreSlim for controlling the concurrency instead of the Merge operator, can be found in the 1st revision. The Merge-based implementation should be better, because it doesn't involve fire-and-forget task continuations, and the subscription to the inner sequences happens synchronously, instead of being offloaded to the ThreadPool.