Search code examples
c#system.reactivereactivex

Astonishing behavior with ReactiveX for .NET - Using Subject<int> to handle messages being pushed from a series of IObservable<int> objects


Originally I started out trying to solve the following question:

I have a websocket client I wrote that needs to be periodically migrated using a well defined client protocol. The current implementation of the client I have exposes an IObservable for each websocket message we receive so that clients can consume it. The original thought I had was to find a way to chain together one client after another so that each client's IObservable sequence would get appended to some aggregated IObservable that could be consumed by developers. Unfortunately I couldn't find an easy way to do this as calls like Concat (understandably) generate a new sequence object I would need to subscribe to.

But when I started trying to put together a test bed to explore concepts, I put the following sample together:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;

ISubject<int> valuesSubject = new Subject<int>();
valuesSubject.Subscribe(Console.WriteLine);

int total = 0;
foreach (int i in Enumerable.Range(0, 9))
{
    var start = i * 5;
    var end = i * 5 + 4;
    Observable.Range(start, end).Subscribe(valuesSubject.OnNext);
}

I was shocked to find the output wasn't 0->49, but rather contained some very unexpected values outside of this range.

Here's a snippet of some values:

...
68
69
70
71
72
73
40
41
42
43
44
45
46
47
48
49
50
51
...

So I can only assume there's something here I fundamentally don't understand. Can anyone help?


Solution

  • Observable.Range(int start, int count) has second parameter count, not end (which, actually, matches Enumarable.Range signature):

    The number of sequential integers to generate.