Search code examples
c#concurrencysystem.reactiverx.net

How to merge a nested observable IObservable<IObservable<T>> with limited concurrency and limited buffer capacity?


I noticed that the Rx Merge operator accepts an optional maxConcurrent parameter. This can be used to limit the maximum concurrency, by subscribing concurrently to a limited number of subsequences. It works perfectly when new subsequences are pushed at a slower rate than the rate of the completion of the subscribed subsequences, but it becomes problematic when new subsequences are pushed faster than that. What happens is that the subsequences are buffered in an internal buffer with a forever increasing size, and also that the currently subscribed subsequences are becoming older and older. Here is a demonstration of this problem:

await Observable
    .Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
    .Select(_ => Observable
        .Return(DateTime.Now)
        .Do(d => Console.WriteLine(
            $"Then: {d:HH:mm:ss.fff}, " +
            $"Now: {DateTime.Now:HH:mm:ss.fff}, " +
            $"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
        .Delay(TimeSpan.FromMilliseconds(1000)))
    .Merge(maxConcurrent: 1)
    .Take(10);

A new subsequence is pushed every 10 milliseconds, and each subsequence completes after 1000 milliseconds. The subsequences are merged with maximum concurrency 1 (sequentially).

Output:

Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes

(Try it on Fiddle)

The memory usage grows steadily, and the time gap between the creation and subscription of each subsequence grows as well.

What I would like to have is a custom Merge variant that has an internal buffer with limited size. When the buffer is full, any incoming subsequence should cause the currently oldest buffered subsequence to be dropped. Here is a marble diagram of the desirable behavior, configured with maximum concurrency = 1 and buffer capacity = 1:

Source: +----A------B------C------|
A:           +-------a----a---|
B:                  not-subscribed
C:                            +-----c----|
Result: +------------a----a---------c----|
  • The subsequence A was subscribed immediately after it was emitted.
  • Then the B was emitted and was stored in the buffer because the A had not completed yet.
  • Then the C was emitted and replaced the B in the buffer. As a result the B subsequence was dropped and was never subscribed.
  • The completion of the subsequence A was followed by the immediate subscription of the buffered subsequence C.
  • The final result contains the merged values emitted by the A and C subsequences.

How could I implement a custom Rx operator with this specific behavior? Here is the stub of the operator I am trying to implement:

public static IObservable<T> MergeBounded<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency,
    int boundedCapacity)
{
    return source.Merge(maximumConcurrency);
    // TODO: enforce the boundedCapacity policy somehow
}

Solution

  • I came up with a functional solution, I'm not sure it's the way to go, just because of complexity. But I think I covered all the bases.

    First, if you take a functional approach, this is a relatively simple state-machine problem: The state needs to know how many observables are currently executing and the buffer queue. The two events that can affect the state are a new Observable entering the buffer queue (causes an enqueue on the buffer queue), or a currently-executing observable terminating (causes a dequeue on the buffer queue).

    Since state-machine basically means Scan, and Scan can only work with one type, we'll have to coerce our two events into one type, which I called Message below. The state machine then knows all and can do the work of the Merge(n) overload.

    The last trick is the loop-back: Since the completing Observable is 'downstream' from Scan, we need to 'loop-back' the termination of that observable into Scan. For that, I always refer back to the Drain function in [this answer][1].

    public static class X
    {
        public static IObservable<T> MergeBounded<T>(
            this IObservable<IObservable<T>> source,
            int maximumConcurrency,
            int boundedCapacity)
        {
            return Observable.Defer(() =>
            {
                var capacityQueue = new Subject<Unit>();
    
                var toReturn = source.Publish(_source => _source
                    .Select(o => Message.Enqueue(o))
                    .Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
                    .Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
                    {
                        var buffer = state.buffer;
                        var bufferCount = state.bufferCount;
                        var executionCount = state.executionCount;
                        if (message.IsEnqueue)
                        {
                            if (executionCount < maximumConcurrency)
                                return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);
    
                            buffer = buffer.Enqueue(message.Object);
                            if (bufferCount == boundedCapacity)
                                buffer = buffer.Dequeue();
                            else
                                bufferCount++;
                            return (bufferCount, buffer, executionCount, null);
                        }
                        else
                        {
                            if (bufferCount == 0)
                                return (0, buffer, executionCount - 1, null);
                            else
                                return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
                        }
                    })
                    .Where(t => t.item != null)
                    .Select(t => t.item)
                    .Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
                    .TakeUntil(_source.IgnoreElements().Materialize())
                    .Merge()
                );
    
                return toReturn;
            });
    
        }
    
        public class Message
        {
            public static Message<T> Enqueue<T>(T t)
            {
                return Message<T>.Enqueue(t);
            }
    
            public static Message<T> Dequeue<T>(T t)
            {
                return Message<T>.Dequeue(t);
            }
    
        }
    
        public class Message<T>
        {
            private readonly T _t;
            private readonly bool _isEnqueue;
            private Message(bool isEnqueue, T t)
            {
                _t = t;
                _isEnqueue = isEnqueue;
            }
            
            public static Message<T> Enqueue(T t)
            {
                return new Message<T>(true, t);
            }
    
            public static Message<T> Dequeue(T t)
            {
                return new Message<T>(false, t);
            }
            
            public bool IsEnqueue => _isEnqueue;
            public T Object => _t;
        }
    }
    

    I wrote some test-code (based on original question) to verify, if you want to piggy back off of that. Test now passing:

    //              T: 0123456789012345678901234567890123
    //            T10: 0         1         2         3
    //         Source: +----A------B------C------|
    //              A:      +-------a----a---|
    //              B:             +----------b----b---|
    //              C:                    +--------c----|
    // ExpectedResult: +------------a----a---------c----|
    
    
    var ts = new TestScheduler();
    
    var A = ts.CreateHotObservable(
        ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
        ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
        ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
    );
    var B = ts.CreateHotObservable(
        ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
        ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
        ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
    );
    var C = ts.CreateHotObservable(
        ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
        ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
    );
    var source = ts.CreateHotObservable(
        ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
        ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
        ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
        ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
    );
    var observer = ts.CreateObserver<string>();
    var testResult = source.MergeBounded(1, 1);
    testResult.Subscribe(observer);
    
    var expected = ts.CreateHotObservable(
        ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
        ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
        ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
        ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
    );
    ts.Start();
    //observer.Messages.Dump("Actual");   // Linqpad
    //expected.Messages.Dump("Expected"); // Linqpad
    ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
    

    (Test code passes without exception)