Originally I started out trying to solve the following question:
I have a websocket client I wrote that needs to be periodically migrated using a well defined client protocol. The current implementation of the client I have exposes an IObservable for each websocket message we receive so that clients can consume it. The original thought I had was to find a way to chain together one client after another so that each client's IObservable sequence would get appended to some aggregated IObservable that could be consumed by developers. Unfortunately I couldn't find an easy way to do this as calls like Concat (understandably) generate a new sequence object I would need to subscribe to.
But when I started trying to put together a test bed to explore concepts, I put the following sample together:
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
ISubject<int> valuesSubject = new Subject<int>();
valuesSubject.Subscribe(Console.WriteLine);
int total = 0;
foreach (int i in Enumerable.Range(0, 9))
{
var start = i * 5;
var end = i * 5 + 4;
Observable.Range(start, end).Subscribe(valuesSubject.OnNext);
}
I was shocked to find the output wasn't 0->49, but rather contained some very unexpected values outside of this range.
Here's a snippet of some values:
...
68
69
70
71
72
73
40
41
42
43
44
45
46
47
48
49
50
51
...
So I can only assume there's something here I fundamentally don't understand. Can anyone help?
Observable.Range(int start, int count)
has second parameter count
, not end
(which, actually, matches Enumarable.Range
signature):
The number of sequential integers to generate.