Search code examples
rx-java2

Rxjava takeuntil wait until first stream finishes


I got a simulator class that emit items, it can be controlled to start emitting by: Simulator.emiting -> Observable, true -> started emitting, false, stopped emitting The items are exposed: Simulator.items -> Observable

The items will be processed, processing happen much slower than emitting (processing potentially occurs on another thread)

I am trying to get an observable that signal when "emitting+processing" starts and ends, so:

from: start emitting , 1, 2, ,3, end emitting

to: start emitting and processing, 1------, 2-----, 3-----, end emitting and processing

how can I get the emitting+processing observable ? I tried using

simulator.items.map { process(it) }.takeUntil(simulator.emitting.filter { it == false }) but this will stop before processing finishes.


Solution

  • So it looks like this is a trivial problem, using zip operator

    val stoppedEmitting = simulator.emitting.filter { it == false }
    val emitted = simulator.items.takeUntil(stoppedEmitting )
    val processed = emitted.map { item -> process(item) }
    

    then "zip" op will wait until the last item get processed:

    val processingFlow = emitted.zipWith(processed) { item, processedItem -> ... }
    processing.subscribe { }