Search code examples
c#observablesystem.reactiverx.net

Rx.Net Window inner observables completing early


Why does

Observable.Interval(TimeSpan.FromSeconds(1)).Window(2, 1).Concat()

yield 0, 1, 2, ...?

My expectation is that .Window(2, 1) would yield a series of pairs of numbers (0, 1, 1, 2, 2, 3, ...) and .Concat() would therefore yield two numbers at a time rather than just one.

I note that

Observable.Interval(TimeSpan.FromSeconds(1)).Window(2, 1).Merge()

does act as expected, however in my use case I have a .Select() performing work between the .Window() and .Concat() and it's important that I preserve order from the original sequence (not actually Observable.Interval in my real code), hence looking to use .Concat() instead.

What am I misunderstanding that causes this difference, and how can I achieve my desired behaviour?


Solution

  • Because .Concat() only subscribes to the next inner observable after the current finishes and at the time of subscription the overlapping "next" observable already emitted its 1st value which was consumed by no one.

    Use either .Merge() or .SelectMany(x => x) instead of concat or use .Buffer or create your buffering operator like the following:

    public static class RxExtensions
    {
        public static IObservable<T> CollectUntilSubscribed<T, TUntil>(this IObservable<T> source, IObservable<TUntil> stopMe)
            {
                var subscribersCount = 0;
    
                var ready = new Subject<Unit>();
    
                var delayed = source
                    .Delay(x => subscribersCount > 0 ? Observable.Return(Unit.Default):ready)
                    .Publish()
                    ;
    
                var connection = delayed.Connect();
    
                stopMe.Take(1).Subscribe(_ => connection.Dispose());
    
                return Observable.Create<T>(o =>
                {
                    var sub = delayed.Subscribe(o);
    
                    if (Interlocked.Increment(ref subscribersCount) == 1) ready.OnNext(Unit.Default);
    
                    return Disposable.Create(() =>
                    {
                        Interlocked.Decrement(ref subscribersCount);
                        sub.Dispose();
                    });
                });
            }
    }
    
    internal class Program
    {
        static void Main(string[] args)
        {
            Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Window(2,1)
                .Select(x => x.CollectUntilSubscribed(Observable.Never<Unit>()))
                .Concat()
                .Subscribe(Console.WriteLine);
    
                Console.ReadLine();
        }
    }