Search code examples
c#system.reactiverx.net

Rx.Net GroupBy, How to subscribe multiple observers to a specific Group


I am taking my first leap into the world of Rx and finding it difficult to get the desired results, especially with the GroupBy operator, so any help would be much appreciated.

How can I subscribe multiple observers to a specific group?

My requirements are:

I have a DataProvider class that makes http Api requests at regular intervals.
The http response is a List<Item>. Each Item has a unique Id property.
I need to process each Item as a separate stream based on its Id, which looks like a case for GroupBy.
Each group needs its own pipeline where:

  • It Starts with a specific value (StartWith operator)
  • It Buffers the previous Item for comparison with the current Item (Buffer(2,1) operator)
  • If the current Item is different to the previous (Where) emit the current Item

The result is an IObservable<Item> of changes (ChangeStream). I am no longer dealing with a specific group.

How can I stay within the group pipeline and allow multiple subscribers to Subscribe to a specific group?

  • Observers can subscribe early (before the Item.Id has appeared on the response stream and before the group is created)
  • Observers can subscribe late (after the Item.Id has appeared on the response stream and the group has been created)
  • Late subscribers should receive the last change for the Item.Id (Replay(1)) but I can’t seem to figure this part out either.

What is the Rx way to Multicast a specific group? Any help / advice would be much appreciated. I have provided sample code below.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using System.Threading;

namespace RxTest
{
    class Program
    {
        static void Main(string[] args)
        {

            var dataService = new MockDataService();

            // How do I subscribe to a specific group?
            // Eg. I am only interested in changes to Items where Id == 1
            // Subscribers can be early (before the stream is hot)
            var item1Stream = dataService.SubscribeToItem(1);

            // There can be multiple subscribers to a group
            var item1Stream2 = dataService.SubscribeToItem(1);

            Console.WriteLine("Press Any Key to Start");
            Console.ReadLine();

            dataService.Start();

            // Subscribers can be late (Eg. Subscriber to Item Id == 2 after it has emitted items)
            Thread.Sleep(2000);
            var item2Stream = dataService.SubscribeToItem(2);

            // Subscribers can be early (After connect but before the Item Id appears on the Stream (before group creation))
            // Eg. Subscribe to group 4 (Group 4 doesn't get created until 20s after connect in this example)
            var item4Stream = dataService.SubscribeToItem(4);

            // What is the Rx way to Multicast a Group?

            Console.WriteLine("Press Any Key to Exit");
            Console.ReadLine();

            dataService.Stop();
        }
    }

    public class MockDataService
    {
        private readonly IConnectableObservable<Item> _itemsStream;
        private IDisposable _itemsSubscription;

        private readonly IObservable<Item> _changeStream;
        private IDisposable _changeSubscription;


        public MockDataService()
        {
            // Simulate Http response pipeline.
            //// Time:   1s...............10s..............20s.....etc
            //// stream: [[1][2]]repeat...[[2][3]]repeat...[[3][4]]repeat...
            IObservable<List<Item>> responseStream = Observable.Interval(TimeSpan.FromSeconds(1))
                .Take(50)
                .Select(tick =>
                {
                    // Every 10 ticks an item drops off the stream and a new one starts
                    // Every 2 ticks the Item value increases to generate a change.
                    int rangeStart = Math.Min(((int)tick / 10) + 1, 5);
                    return Enumerable.Range(rangeStart, 2).Select(id => new Item(id, (int)tick / 2)).ToList();
                });

            // Flatten the list into IObservable<Item>
            //// Time:   1s.............10s............20s.....etc
            //// stream: [1][2]repeat...[2][3]repeat...[3][4]repeat...
            _itemsStream = responseStream
                .SelectMany(list => list)
                .Publish();

            // Split into groups by Item.Id and process each group for changes
            // ChangeStream is an IObservable<Item> that have changes.
            _changeStream = _itemsStream
                .GroupBy(item => item.Id)
                    .SelectMany(grp =>
                        grp
                        // Pipeline for each group.
                        .StartWith(new Item(grp.Key, -1))                       // Initial item from Db
                        //.TakeUntil(Item => Item.IsComplete())                 // Logic to complete the group
                        .LogConsoleWithThread($"Group: {grp.Key}")
                        .Buffer(2, 1)
                        .Where(buffer => buffer.Count == 2 && buffer[0].HasChanges(buffer[1]))
                        .Select(buffer => buffer[1])
                        .LogConsoleWithThread($"Group.Change : {grp.Key}")
                        // How do I push changes in this group to Zero..Many subscribed Observers?
                        // I would also like to Replay(1) to all late subscribers to a group.
                        );
        }

        /// <summary>
        /// How to get the IObservable for a specific group?
        /// </summary>
        /// <param name="itemId"></param>
        /// <returns></returns>
        public IObservable<Item> SubscribeToItem(int itemId)
        {
            // ????
            return null;
        }

        public void Start()
        {
            _changeSubscription = _changeStream.SubscribeConsole("ChangeStream");
            _itemsSubscription = _itemsStream.Connect();
        }

        public void Stop()
        {
            _changeSubscription.Dispose();
            _itemsSubscription.Dispose();
        }

    }


    public class Item
    {
        public int Id { get; private set; }

        public int Value { get; private set; }

        public Item(int id, int value)
        {
            Id = id;
            Value = value;
        }

        public bool HasChanges(Item compareItem)
        {
            return this.Value != compareItem.Value;
        }

        public override string ToString()
        {
            return $"Item: Id={Id}  Value={Value}";
        }
    }


    public static class RxExtensions
    {
        public static IDisposable SubscribeConsole<T>(this IObservable<T> observable, string name = "")
        {
            return observable.Subscribe(new ConsoleObserver<T>(name));
        }


        /// <summary>
        /// Logs to the Console the subscriptions and emissions done on/by the observable
        /// each log message also includes the thread it happens on
        /// </summary>
        /// <typeparam name="T">The Observable Type</typeparam>
        /// <param name="observable">The Observable to log.</param>
        /// <param name="name">An optional name prefix that will be added before each notification</param>
        /// <returns></returns>
        public static IObservable<T> LogConsoleWithThread<T>(this IObservable<T> observable, string name = "")
        {
            return Observable.Defer(() =>
            {
                Console.WriteLine("{0} Subscription happened on Thread: {1}", name, Thread.CurrentThread.ManagedThreadId);

                return observable.Do(
                    x => Console.WriteLine("{0} - OnNext({1}) Thread: {2}", name, x, Thread.CurrentThread.ManagedThreadId),
                    ex =>
                    {
                        Console.WriteLine("{0} - OnError Thread:{1}", name, Thread.CurrentThread.ManagedThreadId);
                        Console.WriteLine("\t {0}", ex);
                    },
                    () => Console.WriteLine("{0} - OnCompleted() Thread {1}", name, Thread.CurrentThread.ManagedThreadId));
            });
        }
    }

    /// <summary>
    /// An observer that outputs to the console each time the OnNext, OnError or OnComplete occurs
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class ConsoleObserver<T> : IObserver<T>
    {
        private readonly string _name;

        public ConsoleObserver(string name = "")
        {
            _name = name;
        }

        public void OnNext(T value)
        {
            Console.WriteLine("{0} - OnNext({1})", _name, value);
        }

        public void OnError(Exception error)
        {
            Console.WriteLine("{0} - OnError:", _name);
            Console.WriteLine("\t {0}", error);
        }

        public void OnCompleted()
        {
            Console.WriteLine("{0} - OnCompleted()", _name);
        }
    }


}

Solution

  • You probably need a specialized publishing operator, because the existing ones (Publish, PublishLast and Replay) are too narrow or too broad for your needs. So you'll need to use the Multicast operator, supplied with a custom replay subject that buffers only the last element per key. Here is a basic implementation of such a subject:

    public class ReplayLastPerKeySubject<T, TKey> : ISubject<T>
    {
        private readonly Func<T, TKey> _keySelector;
        private readonly ReplaySubject<ReplaySubject<T>> _subjects;
        private readonly IObservable<T> _mergedSubjects;
        private readonly Dictionary<TKey, ReplaySubject<T>> _perKey;
    
        public ReplayLastPerKeySubject(Func<T, TKey> keySelector)
        {
            _keySelector = keySelector;
            _subjects = new ReplaySubject<ReplaySubject<T>>();
            _mergedSubjects = _subjects.Merge();
            _perKey = new Dictionary<TKey, ReplaySubject<T>>();
        }
    
        public void OnNext(T value)
        {
            var key = _keySelector(value);
            ReplaySubject<T> subject;
            if (!_perKey.TryGetValue(key, out subject))
            {
                subject = new ReplaySubject<T>(1);
                _perKey.Add(key, subject);
                _subjects.OnNext(subject);
            }
            subject.OnNext(value);
        }
    
        public void OnCompleted()
        {
            // All subjects, inner and outer, must be completed
            _subjects.OnCompleted();
            _subjects.Subscribe(subject => subject.OnCompleted());
        }
    
        public void OnError(Exception error)
        {
            // Faulting the master (outer) subject is enough
            _subjects.OnError(error);
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return _mergedSubjects.Subscribe(observer);
        }
    }
    

    This implementation is based on an answer of a similar question, written by an RX expert. The original answer uses a Concat observable for subscribing the observers, while this one uses a Merge observable, so I am not 100% sure about its correctness and efficiency.

    Having such an implementation in place, the rest is easy. You first create a published version of your original observable:

    var published = YourObservable
        .Multicast(new ReplayLastPerKeySubject<Item, int>(x => x.Id)))
        .RefCount();
    

    And finally you can create a change stream for a specific key, by using the Where operator:

    var changeStream13 = published.Where(x => x.Id == 13);