Search code examples
c#system.reactiverx.net

How can I clear the buffer on a ReplaySubject?


How can I clear the buffer on a ReplaySubject?

Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject continually growing and eventually eating all the memory.

Ideally I want to keep the same ReplaySubject as the client subscriptions are still good.


Solution

  • ReplaySubject doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:

    • A maximum TimeSpan that items are retained for
    • A maximum item count
    • A combination of the above, which drops items as soon as either condition is met.

    A Clearable ReplaySubject

    This was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.

    I've run this through a memory profiler to check it does the right thing. Call Clear() to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject:

    public class RollingReplaySubject<T> : ISubject<T>
    {
        private readonly ReplaySubject<IObservable<T>> _subjects;
        private readonly IObservable<T> _concatenatedSubjects;
        private ISubject<T> _currentSubject;
    
        public RollingReplaySubject()
        {
            _subjects = new ReplaySubject<IObservable<T>>(1);
            _concatenatedSubjects = _subjects.Concat();
            _currentSubject = new ReplaySubject<T>();
            _subjects.OnNext(_currentSubject);
        }
    
        public void Clear()
        {
            _currentSubject.OnCompleted();
            _currentSubject = new ReplaySubject<T>();
            _subjects.OnNext(_currentSubject);
        }
    
        public void OnNext(T value)
        {
            _currentSubject.OnNext(value);
        }
    
        public void OnError(Exception error)
        {
            _currentSubject.OnError(error);
        }
    
        public void OnCompleted()
        {
            _currentSubject.OnCompleted();
            _subjects.OnCompleted();     
            // a quick way to make the current ReplaySubject unreachable
            // except to in-flight observers, and not hold up collection
            _currentSubject = new Subject<T>();       
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return _concatenatedSubjects.Subscribe(observer);
        }
    }
    

    Respect usual rules (as with any Subject) and don't call methods on this class concurrently - including Clear(). You could add synchronization locks trivially if needed.

    It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects) holds a buffer of exactly one inner ReplaySubject (_currentSubject), and it is populated on construction.

    The OnXXX methods call through to the _currentSubject ReplaySubject.

    Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects). Because the buffer size of _subjects is just 1, new subscribers acquire the events of only the most recent ReplaySubject onwards.

    Whenever we need to "clear the buffer", the existing _currentSubject is OnCompleted and a new ReplaySubject is added to _subjects and becomes the new _currentSubject.

    Enhancements

    Following @Brandon's suggestion, I created a version of RollingReplaySubject that uses either a TimeSpan or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07