Search code examples
javascriptc#rxjsreactive-programmingsystem.reactive

An Rx observable that would act as ReplaySubject but only for the 1st subscriber?


What's an elegant way of composing an Rx observable which would resemble ReplaySubject, but only emit the accumulated sequence once and for the first subscriber only (when that subscriber is connected)? After the 1st subscription, it should act just as a regular Subject.

This for a .NET project, but I'd equally appreciate JavaScript/RxJS answers.

I did google for potential solutions, and I'm about to roll out my own, similar to how I approached DistinctSubject, eventually.


Solution

  • I modified slightly the implementation found in a similar question, and changed the name of the class from ReplayOnceSubject to ReplayFirstSubscriberOnlySubject:

    public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
    {
        private readonly object _locker = new object();
        private ISubject<T> _subject = new ReplaySubject<T>();
    
        public void OnNext(T value) { lock (_locker) _subject.OnNext(value); }
        public void OnError(Exception error) { lock (_locker) _subject.OnError(error); }
        public void OnCompleted() { lock (_locker) _subject.OnCompleted(); }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null) throw new ArgumentNullException(nameof(observer));
            lock (_locker)
            {
                if (_subject is ReplaySubject<T> replaySubject)
                {
                    var subject = new Subject<T>();
                    var subscription = subject.Subscribe(observer);
                    // Now replay the buffered notifications
                    replaySubject.Subscribe(subject).Dispose();
                    replaySubject.Dispose();
                    _subject = subject;
                    return subscription;
                }
                else
                    return _subject.Subscribe(observer);
            }
        }
    }
    

    This is probably not the most efficient solution, since two different locks are acquired on every operation (the _locker and the internal _gate), but it shouldn't be very bad either.