Why does
Observable.Interval(TimeSpan.FromSeconds(1)).Window(2, 1).Concat()
yield 0
, 1
, 2
, ...?
My expectation is that .Window(2, 1)
would yield a series of pairs of numbers (0, 1
, 1, 2
, 2, 3
, ...) and .Concat()
would therefore yield two numbers at a time rather than just one.
I note that
Observable.Interval(TimeSpan.FromSeconds(1)).Window(2, 1).Merge()
does act as expected, however in my use case I have a .Select()
performing work between the .Window()
and .Concat()
and it's important that I preserve order from the original sequence (not actually Observable.Interval
in my real code), hence looking to use .Concat()
instead.
What am I misunderstanding that causes this difference, and how can I achieve my desired behaviour?
Because .Concat()
only subscribes to the next inner observable after the current finishes and at the time of subscription the overlapping "next" observable already emitted its 1st value which was consumed by no one.
Use either .Merge()
or .SelectMany(x => x)
instead of concat
or use .Buffer
or create your buffering operator like the following:
public static class RxExtensions
{
public static IObservable<T> CollectUntilSubscribed<T, TUntil>(this IObservable<T> source, IObservable<TUntil> stopMe)
{
var subscribersCount = 0;
var ready = new Subject<Unit>();
var delayed = source
.Delay(x => subscribersCount > 0 ? Observable.Return(Unit.Default):ready)
.Publish()
;
var connection = delayed.Connect();
stopMe.Take(1).Subscribe(_ => connection.Dispose());
return Observable.Create<T>(o =>
{
var sub = delayed.Subscribe(o);
if (Interlocked.Increment(ref subscribersCount) == 1) ready.OnNext(Unit.Default);
return Disposable.Create(() =>
{
Interlocked.Decrement(ref subscribersCount);
sub.Dispose();
});
});
}
}
internal class Program
{
static void Main(string[] args)
{
Observable
.Interval(TimeSpan.FromSeconds(1))
.Window(2,1)
.Select(x => x.CollectUntilSubscribed(Observable.Never<Unit>()))
.Concat()
.Subscribe(Console.WriteLine);
Console.ReadLine();
}
}