Search code examples
c#system.reactiverx.net

How to make a lightweight `Replay` operator that can be subscribed only once?


In various occasions I've wished for an Rx Replay operator that buffers the incoming notifications, replays its buffer synchronously when it is subscribed for the first time, and stops buffering after that. This lightweight Replay operator should be able to serve only one subscriber. One use case for such an operator can be found here, where continuing to buffer after the first subscription is just a waste of resources. For demonstration purposes I'll show here a contrived example of the problematic behavior I wish I could avoid:

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(500))
    .SelectMany(x => Enumerable.Range((int)x * 100_000 + 1, 100_000))
    .Take(800_000)
    .Do(x =>
    {
        if (x % 100_000 == 0) Console.WriteLine(
            $"{DateTime.Now:HH:mm:ss.fff} > " +
            $"Produced: {x:#,0}, TotalMemory: {GC.GetTotalMemory(true):#,0} bytes");
    })
    .Replay()
    .AutoConnect(0);

await Task.Delay(2200);

Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Subscribing...");

// First subscription
await observable.Do(x =>
{
    if (x % 100_000 == 0)
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Emitted: {x:#,0}");
});

// Second subscription
Console.WriteLine($"Count: {await observable.Count():#,0}");

The observable generates 800,000 values in total. The Replay mechanism is connected immediately to the source, and it is subscribed halfway before its completion.

Output:

16:54:19.893 > Produced: 100,000, TotalMemory: 635,784 bytes
16:54:20.341 > Produced: 200,000, TotalMemory: 1,164,376 bytes
16:54:20.840 > Produced: 300,000, TotalMemory: 2,212,992 bytes
16:54:21.354 > Produced: 400,000, TotalMemory: 2,212,992 bytes
16:54:21.543 > Subscribing...
16:54:21.616 > Emitted: 100,000
16:54:21.624 > Emitted: 200,000
16:54:21.633 > Emitted: 300,000
16:54:21.641 > Emitted: 400,000
16:54:21.895 > Produced: 500,000, TotalMemory: 4,313,344 bytes
16:54:21.897 > Emitted: 500,000
16:54:22.380 > Produced: 600,000, TotalMemory: 6,411,208 bytes
16:54:22.381 > Emitted: 600,000
16:54:22.868 > Produced: 700,000, TotalMemory: 6,411,600 bytes
16:54:22.869 > Emitted: 700,000
16:54:23.375 > Produced: 800,000, TotalMemory: 6,413,400 bytes
16:54:23.376 > Emitted: 800,000
Count: 800,000

The memory usage keeps growing after the subscription. This is expected, because all values are buffered, and remain buffered for the whole lifetime of the replayed observable. The desirable behavior would be for the memory usage to plummet after the subscription. The buffer should be discarded after propagating the buffered values, because there is no use for it after the subscription. Also the second subscription (the await observable.Count()) should fail with an InvalidOperationException. I don't want to be able to subscribe again to the observable, after it has lost its Replay functionality.

Here is the stub of the custom ReplayOnce operator I am trying to implement. Does anyone has any idea about how to implement it?

public static IConnectableObservable<T> ReplayOnce<T>(this IObservable<T> source)
{
    return source.Replay(); // TODO: enforce the subscribe-once policy
}

Btw there is a related question here, about how to make a Replay operator with a buffer that can be emptied occasionally on demand. My question is different in that I want the buffer to be completely disabled after the subscription, and not start growing ever again.


Solution

  • I came up with an implementation of the ReplayOnce operator, which is based on multicasting a custom ReplayOnceSubject<T>. This subject is backed initially by a ReplaySubject<T>, which is switched with a normal Subject<T> during the first (and only allowed) subscription:

    public static IConnectableObservable<T> ReplayOnce<T>(
        this IObservable<T> source)
    {
        return source.Multicast(new ReplayOnceSubject<T>());
    }
    
    private class ReplayOnceSubject<T> : ISubject<T>
    {
        private readonly object _locker = new object();
        private ISubject<T> _subject = new ReplaySubject<T>();
    
        public void OnNext(T value)
        {
            lock (_locker) _subject.OnNext(value);
        }
    
        public void OnError(Exception error)
        {
            lock (_locker) _subject.OnError(error);
        }
    
        public void OnCompleted()
        {
            lock (_locker) _subject.OnCompleted();
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            lock (_locker)
            {
                if (_subject is ReplaySubject<T> replaySubject)
                {
                    var subject = new Subject<T>();
                    var subscription = subject.Subscribe(observer);
                    // Now replay the buffered notifications
                    replaySubject.Subscribe(subject).Dispose();
                    _subject = subject;
                    return subscription;
                }
                else
                    throw new InvalidOperationException("Already subscribed.");
            }
        }
    }
    

    The line replaySubject.Subscribe(subject) ensures that not only the buffered values will be propagated to the observer, but also any possible OnError/OnCompleted notifications. After the subscription the ReplaySubject is no longer referenced, and becomes eligible for garbage collection.