I got a simulator class that emit items, it can be controlled to start emitting by: Simulator.emiting -> Observable, true -> started emitting, false, stopped emitting The items are exposed: Simulator.items -> Observable
The items will be processed, processing happen much slower than emitting (processing potentially occurs on another thread)
I am trying to get an observable that signal when "emitting+processing" starts and ends, so:
from: start emitting , 1, 2, ,3, end emitting
to: start emitting and processing, 1------, 2-----, 3-----, end emitting and processing
how can I get the emitting+processing observable ? I tried using
simulator.items.map { process(it) }.takeUntil(simulator.emitting.filter { it == false }) but this will stop before processing finishes.
So it looks like this is a trivial problem, using zip operator
val stoppedEmitting = simulator.emitting.filter { it == false }
val emitted = simulator.items.takeUntil(stoppedEmitting )
val processed = emitted.map { item -> process(item) }
then "zip" op will wait until the last item get processed:
val processingFlow = emitted.zipWith(processed) { item, processedItem -> ... }
processing.subscribe { }