Search code examples
observablereactiverx.netcombinelatest

Extensible Combination of Observables


I'd like to store IObservables of Lists in a container and subscribe to a combination of these observables retrieving a merged List. Then I would like to be able to add more Observables without having to renew the subscription and still get the new results. It should ideally also fire when adding the new observable to the store. The following code should explain more:

using System;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Linq;

namespace dynamic_combine
{
    class ObservableStuff
    {
        private List<IObservable<List<String>>> _listOfObservables = new List<IObservable<List<String>>>();

        public ObservableStuff() { }

        public void AddObservable(IObservable<List<String>> obs)
        {
            _listOfObservables.Add(obs);
        }

        public IObservable<IList<String>> GetCombinedObservable()
        {
            return Observable.CombineLatest(_listOfObservables)
                .Select((all) =>
                {
                    List<String> mergedList = new List<String>();
                    foreach(var list in all)
                    {
                        mergedList = mergedList.Concat(list).ToList();
                    }
                    return mergedList;
                });
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            ObservableStuff Stuff = new ObservableStuff();
            BehaviorSubject<List<String>> A = new BehaviorSubject<List<String>>(new List<String>() { "a", "b", "c" });
            BehaviorSubject<List<String>> B = new BehaviorSubject<List<String>>(new List<String>() { "d", "e", "f" });
            BehaviorSubject<List<String>> C = new BehaviorSubject<List<String>>(new List<String>() { "x", "y", "z" });

            Stuff.AddObservable(A.AsObservable());
            Stuff.AddObservable(B.AsObservable());

            Stuff.GetCombinedObservable().Subscribe((x) =>
            {
                Console.WriteLine(String.Join(",", x));
            });

            // Initial Output: a,b,c,d,e,f

            A.OnNext(new List<String>() { "1", "2", "3", "4", "5" });
            // Output: 1,2,3,4,5,d,e,f

            B.OnNext(new List<String>() { "6", "7", "8", "9", "0" });
            // Output: 1,2,3,4,5,6,7,8,9,0

            Stuff.AddObservable(C.AsObservable());
            // Wishful Output: 1,2,3,4,5,6,7,8,9,0,x,y,z

            C.OnNext(new List<String>() { "y", "e", "a", "h" });
            // Wishful Output: 1,2,3,4,5,6,7,8,9,0,y,e,a,h

            Console.WriteLine("Press the any key...");
            Console.ReadKey();
        }
    }
}

Although the example is in C# it finally needs to be implemented in rxCpp. Also it would be interresting to see implementation in other variations of Rx.

I've setup a repository to check the code and will probably extend it to other languages: https://gitlab.com/dwaldorf/rx-examples

BR, Daniel


Solution

  • First, a couple changes because your code isn't that easy to read. GetCombinedObservable can be re-written to this:

    public IObservable<IList<String>> GetCombinedObservable()
    {
        return Observable.CombineLatest(_listOfObservables)
            .Select(l => l.SelectMany(s => s).ToList());
    }
    

    Your problem reduces down to two things: You want the _listOfObservables to be dynamic, which means changing it from List<IObservable<T>> to IObservable<IObservable<T>>. The problem with doing that though, is that CombineLatest doesn't support IObservable<IObservable<T>>, so we'll have to create one.

    That brings us this fun, ugly little function (uses nuget package System.Collections.Immutable):

    public static class X
    {
        public static IObservable<List<T>> DynamicCombineLatest<T>(this IObservable<IObservable<T>> source)
        {
            return source
                .SelectMany((o, i) => o.Select(item => (observableIndex: i, item: item)))
                .Scan(ImmutableDictionary<int, T>.Empty, (state, t) => state.SetItem(t.observableIndex, t.item))
                .Select(dict => dict.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList());
        }
    }
    

    Now we can update your class:

    class ObservableStuff
    {
        private ReplaySubject<IObservable<List<String>>> _subject = new ReplaySubject<IObservable<List<String>>>(int.MaxValue);
    
        public ObservableStuff() { }
    
        public void AddObservable(IObservable<List<String>> obs)
        {
            _subject.OnNext(obs);
        }
    
        public IObservable<IList<String>> GetCombinedObservable()
        {
            return _subject
                .DynamicCombineLatest()
                .Select(l => l.SelectMany(s => s).ToList());
        }
    }