Search code examples
c#system.reactive

How to merge a hot and cold observable of the same source while avoiding duplicates efficiently?


I have a source of elements. It can be queried and it publishes events when elements are added. In other words, I can make a cold observable from the query results, and a hot observable from the event.

One approach is to merge the cold and the hot.

var o = cold.Merge(hot);

It works, but it could miss elements if they were emitted between the cold and hot observables being subscribed to.

So the other approach is the merge the hot and the cold.

var o = hot.Merge(cold);

This works too and avoids missing elements, but it would produce duplicates when elements were emitted from the hot observable before the cold observable was subscribed to.

What about filtering out indistinct elements?

var o = hot.Merge(cold).Distinct()

This solves the problem, but it will maintain a growing list of elements to check for distinctness. I'm interesting in find a way to accomplish the behaviour of using Distinct, but without using more and more memory over time.

I believe the solution is something like:

Subscribe a subject to the hot observable. Subscribe the cold observable to the observer. Pause the subject (?), then replay the subject to the observer, but with the check for distinctness. Finally, unpause (or however that would work) the subject and subscribe it to the observer (without distinctness check).

Below is sample code that demonstrates the behaviours described above. You can run it here if you don't have a compiler available.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;

var strategies = new[]
{
    WarmObservable.WithMissesFrom<Int32>,
    WarmObservable.WithDuplicatesFrom,
    WarmObservable.WithCorrectButHighMemoryUsageFrom,
    //WarmObservable.From
};
foreach (var strategy in strategies)
{
    var observed = new List<Int32>();
    var nums = new ObservableAddOnlyCollection<Int32>();
    var warm = strategy(nums, nums.Added);
    const Int32 max = 1_000_000;
    Parallel.For(0, max, i =>
    {
        nums.Add(i);
        if (i == max / 2)
            warm.Subscribe(observed.Add);
    });
    Console.WriteLine($"{observed.Count:N0} elements observed using {strategy.Method.Name}.");
}

public sealed class ObservableAddOnlyCollection<T> : IObservable<T>
{
    private readonly List<T> items = new();
    private event Action<T> added = _ => {};

    public IObservable<T> Added => Observable.FromEvent<T>(h => added += h, h => added -= h);
    public void Add(T item) { lock (items) items.Add(item); added.Invoke(item); }
    public IDisposable Subscribe(IObserver<T> observer) => ToList().ToObservable().Subscribe(observer);
    private List<T> ToList() { lock (items) return items.ToList(); }
}

public static class WarmObservable
{
    public static IObservable<T> WithMissesFrom<T>(IObservable<T> cold, IObservable<T> hot)
    {
        return cold.Merge(hot);
    }
    
    public static IObservable<T> WithDuplicatesFrom<T>(IObservable<T> cold, IObservable<T> hot)
    {
        return hot.Merge(cold);
    }
    
    public static IObservable<T> WithCorrectButHighMemoryUsageFrom<T>(IObservable<T> cold, IObservable<T> hot)
    {
        return hot.Merge(cold).Distinct();
    }
    
    public static IObservable<T> From<T>(IObservable<T> cold, IObservable<T> hot)
    {
        // merge a cold observable with a hot observable, without losing any elements and without duplicating any elements
        // also don't require an increasing memory footprint to distinguish duplicates
        throw new NotImplementedException();
    }
}

Solution

  • I realized that implementing a wrapper IObserver<T> class makes it trivial to turn off the distinct check once the cold observable has been enumerated.

    Here's the working solution

    public static class WarmObservable
    {
        public static IObservable<T> From<T>(IObservable<T> cold, IObservable<T> hot, IEqualityComparer<T> ec)
        {
            // merge a cold observable with a hot observable, without losing any elements and without duplicating any elements
            // also don't require an increasing memory footprint to distinguish duplicates
            return new IgnoreDuplicatesUntilColdCompletesObservable<T>(cold, hot, ec);
        }
        
        private sealed class IgnoreDuplicatesUntilColdCompletesObservable<T>(IObservable<T> cold, IObservable<T> hot, IEqualityComparer<T> ec) : IObservable<T>
        {
            public IDisposable Subscribe(IObserver<T> observer)
            {
                var o = new DistinctObserver(observer, ec);
                return hot.Merge(cold.Finally(() => o.OnColdCompleted())).Subscribe(o);
            }
        
            private sealed class DistinctObserver(IObserver<T> o, IEqualityComparer<T> ec) : IObserver<T>
            {
                private HashSet<T>? set = new(ec);
    
                public void OnColdCompleted() => Interlocked.Exchange(ref set, null);
    
                public void OnCompleted() => o.OnCompleted();
                public void OnError(Exception error) => o.OnError(error);
    
                public void OnNext(T value)
                {
                    if (set is not null && Interlocked.CompareExchange(ref set, null, null) is {} x)
                    {
                        if (x.Add(value))
                        {
                            o.OnNext(value);
                        }
                    }
                    else
                    {
                        o.OnNext(value);
                    }
                }
            }
        }
    }
    

    Optimization explanation:

    The "normal" check of set is not null is before the interlocked check as an optimization. Once set has been assigned null, that value will be propagated to the CPU cache when it is read by interlocked, therefore it doesn't have to do the interlocked read anymore.