I have a "values" IObservable<T>
which is returning T
elements which must be combined in sequence into variable-length arrays, and I have a "control" IObservable<int>
which tells me how long the next array must be.
Dropping an element, repeating it, or getting the results out of order will make the results meaningless.
This is for a serial-connected robotics project I'm rewriting in Rx.NET.
IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.SelectMany(length => values.Take(length).ToArray());
I'd like to see something like this:
values ----A--B--C--D--E--F--G--H-->
control --1-----4---------------2--->
result ---[A]---------[BCDE]--[FG]->
But my attempt so far results in
-[A]-[AB]-[ABCD]->
You can create several helper subject to prepare/create new observables to build the new observable you want. You can build subjects for these kind of observables:
control
. From (1, 4, 2)
you will get (guid_1, guid_2, guid_2, guid_2, guid_2, guid_3, guid_3)
. Call this observable repeatSize
.Zip()
operator to combine one value each from repeatSize
and values
. You will get an observable with the values: ((A,guid_1), (B,guid_2), (C,guid_2), (D,guid_2), (E,guid_2), (F,guid_3), (G,guid_3))
. Call this observable zippedValues
.zippedValues
and put/add the original value in a list. Also save the previous value from the repeatSize
observable. Compare it with the current value of repeatSize
. When it has been changed (like from guid_2
to guid_3
) you know you have reached the end/start, so you can send the filled list to a new observable. After that you reset the list again and begin filling it up again.You might need to build 2-3 Subject<T>
objects, subscribe to them and use several OnNext()
calls to fill them from subscription of other observables.