Search code examples
c#system.reactivedynamic-data

Create a IObservableList from a list of IObservable


I'm looking for a reactive object that implement IObservable<IReadOnlyList<T>> and IList<IObservable<T>>.

That's it, I would like to be able to write :

var list = new MyReactiveList<int>();
var item = new Subject<int>();
list.Subscribe(values => Console.WriteLine($"[{string.Join(", ", values)}]"));
list.Add(item);
item.OnNext(1); // Will print out [1]

Solution

  • First up, your code that you posted in your question doesn't compile. I have fixed it the best I can:

    var list = new MyReactiveList<int>();
    var item = new Subject<int>();
    list.Subscribe(values => Console.WriteLine($"[{string.Join(", ", values)}]"));
    list.Add(item);
    item.OnNext(1); // Will print out [1]
    

    Now the exercise is just to implement class MyReactiveList<T> : IObservable<IReadOnlyList<T>>, IList<IObservable<T>>. That's fairly simply, but the only problem is to somehow turn a mutable List<IObservable<T>> into an IObservable<IReadOnlyList<T>> such that the observable updates itself when the list changes.

    Here it is:

    public class MyReactiveList<T> : IObservable<IReadOnlyList<T>>, IList<IObservable<T>>
    {
        private List<IObservable<T>> _list = new List<IObservable<T>>();
    
        private Subject<Unit> _update = new Subject<Unit>();
    
        public IDisposable Subscribe(IObserver<IReadOnlyList<T>> observer) =>
            _update
                .Select(_ => _list.CombineLatest().Select(x => new ReadOnlyList<T>(x)))
                .Switch()
                .Subscribe(observer);
    
        public IObservable<T> this[int index]
        {
            get => _list[index];
            set
            {
                _list[index] = value;
                _update.OnNext(Unit.Default);
            }
        }
    
        public int Count => _list.Count;
    
        public bool IsReadOnly => false;
    
        public void Add(IObservable<T> item)
        {
            _list.Add(item);
            _update.OnNext(Unit.Default);
        }
    
        public void Clear()
        {
            _list.Clear();
            _update.OnNext(Unit.Default);
        }
    
        public bool Contains(IObservable<T> item) => _list.Contains(item);
    
        public void CopyTo(IObservable<T>[] array, int arrayIndex)
        {
            _list.CopyTo(array, arrayIndex);
        }
    
        public IEnumerator<IObservable<T>> GetEnumerator() => _list.GetEnumerator();
    
        public int IndexOf(IObservable<T> item) => _list.IndexOf(item);
    
        public void Insert(int index, IObservable<T> item)
        {
            _list.Insert(index, item);
            _update.OnNext(Unit.Default);
        }
    
        public bool Remove(IObservable<T> item)
        {
            var removed = _list.Remove(item);
            _update.OnNext(Unit.Default);
            return removed;
        }
    
        public void RemoveAt(int index)
        {
            _list.RemoveAt(index);
            _update.OnNext(Unit.Default);
        }
    
        IEnumerator IEnumerable.GetEnumerator() => _list.GetEnumerator();
    }
    
    public class ReadOnlyList<T> : IReadOnlyList<T>
    {
        public ReadOnlyList(IEnumerable<T> items) { _list.AddRange(items); }
    
        private List<T> _list = new List<T>();
    
        public T this[int index] => _list[index];
    
        public int Count => _list.Count;
    
        public IEnumerator<T> GetEnumerator() => _list.GetEnumerator();
    
        IEnumerator IEnumerable.GetEnumerator() => _list.GetEnumerator();
    }
    

    NB: It's not a great idea to implement your own observables - it's easy to get them wrong and create code that doesn't play well with the concurrency.