Search code examples
c#rx.net

How to use an IObservable<int> to segment an IObservable<T> into an IObservable<T[]> of varying length


I have a "values" IObservable<T> which is returning T elements which must be combined in sequence into variable-length arrays, and I have a "control" IObservable<int> which tells me how long the next array must be. Dropping an element, repeating it, or getting the results out of order will make the results meaningless.

This is for a serial-connected robotics project I'm rewriting in Rx.NET.

IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.SelectMany(length => values.Take(length).ToArray());

I'd like to see something like this:

values  ----A--B--C--D--E--F--G--H-->
control --1-----4---------------2--->
result  ---[A]---------[BCDE]--[FG]->

But my attempt so far results in

-[A]-[AB]-[ABCD]->

Solution

  • You can create several helper subject to prepare/create new observables to build the new observable you want. You can build subjects for these kind of observables:

    1. Create a new observable which repeats a unique value the count equal to the number read from control. From (1, 4, 2) you will get (guid_1, guid_2, guid_2, guid_2, guid_2, guid_3, guid_3). Call this observable repeatSize.
    2. Use the Zip() operator to combine one value each from repeatSize and values. You will get an observable with the values: ((A,guid_1), (B,guid_2), (C,guid_2), (D,guid_2), (E,guid_2), (F,guid_3), (G,guid_3)). Call this observable zippedValues.
    3. Subscribe on zippedValues and put/add the original value in a list. Also save the previous value from the repeatSize observable. Compare it with the current value of repeatSize. When it has been changed (like from guid_2 to guid_3) you know you have reached the end/start, so you can send the filled list to a new observable. After that you reset the list again and begin filling it up again.

    You might need to build 2-3 Subject<T> objects, subscribe to them and use several OnNext() calls to fill them from subscription of other observables.