Search code examples
c#unity-game-enginesystem.reactivereactive-programmingobserver-pattern

Count all subscriptions of a subject


I have a Subject where I subscribe methods that should be called when a determined event in a game happens.

public Subject<SomeEvent> TestSubject = new Subject<SomeEvent>();

Some instances subscribe to that Subject.

TestSubject.Subscribe(MyMethod);

My objective is to count how many methods have been subscribed to that Subject. I've seen some examples using Count() extension but I need an int as a return value so then I can use it somewhere else, and Count() returns an IObservable.

if (subjectCount > 0)
{
    DoSomething();
}

Is there any way to get the number of subscriptions on a subject or do I need to keep track of them manually (having a public int SubjectSubcriptions and adding 1 everytime I subscribe a method) ?


Solution

  • Easiest way would be to create your own implementation of ISubject with a wrapper around a subject.

    public class CountSubject<T> : ISubject<T>, IDisposable
    {
        private readonly ISubject<T> _baseSubject;
        private int _counter;
        private IDisposable _disposer = Disposable.Empty;
        private bool _disposed;
    
        public int Count
        {
            get { return _counter; }
        }
    
        public CountSubject()
            : this(new Subject<T>())
        {
            // Need to clear up Subject we created
            _disposer = (IDisposable) _baseSubject;
        }
    
        public CountSubject(ISubject<T> baseSubject)
        {
            _baseSubject = baseSubject;
        }
    
        public void OnCompleted()
        {
            _baseSubject.OnCompleted();
        }
    
        public void OnError(Exception error)
        {
            _baseSubject.OnError(error);
        }
    
        public void OnNext(T value)
        {
            _baseSubject.OnNext(value);
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            Interlocked.Increment(ref _counter);
            return new CompositeDisposable(Disposable.Create(() => Interlocked.Decrement(ref _counter)),
                                           _baseSubject.Subscribe(observer));
        }
    
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    
        protected virtual void Dispose(bool disposing)
        {
            if (!_disposed)
            {
                if (disposing)
                {
                    _disposer.Dispose();
                }
                _disposed = true;
            }
        }
    }