Search code examples
androidkotlinrx-javarx-androidrx-kotlin

How can I conditionally add an asynchronous operation in the middle of an RxJava stream?


Here's a simplified version of what I'm trying to do (using Kotlin and RxJava)

makeServerCall()
                .doOnNext {
                    doStuff(it)
                }
                //TODO: if it == 0, call asyncOperation() and wait for its callback to fire 
                //before running the rest of the stream. Otherwise immediately run the rest
                //of the stream
                .flatMap {
                    observable1(it)
                    observable2(it)
                    Observable.merge(
                            getSpotSearchObservable(observable1),
                            getSpotSearchObservable(observable2)
                }
                .subscribeBy(onNext = {
                allDone()
                    view?
                })

How do I squeeze in the call to asyncOperation() and make the rest of the stream wait for its callback to fire, but only when a certain condition is met? This seems like it's probably a trivial operation in Rx, but no obvious solution is coming to mind.


Solution

  • FlatMap it!

    .flatMap {
        if (it == 0) {
            return@flatMap asyncOperation()
                .ignoreElements()
                .andThen(Observable.just(0))
        }
        return@flatMap Observable.just(it)
    }
    .flatMap {
        observable1(it)
        observable2(it)
        Observable.merge(
            getSpotSearchObservable(observable1),
            getSpotSearchObservable(observable2)
        )
    }