Search code examples
androidkotlintimeoutrx-java2combinelatest

RxJava Kotlin combineLatest timeout before first element


I'm developing an android app in Kotlin. I have got 3 realtime data observables. The data comes from Firestore. They are wrapped inside RxJava's Observable.combineLatest() method. I want to set a timeout on a first data retrieval.

I have tried to set timeout functions on each of observables but they throw TimeoutException after initial data from all 3 observables has been loaded.

private fun retrieveAllData() {
Observable.combineLatest(
                retrieveDataObservable1().timeout(10, TimeUnit.SECONDS),
                retrieveDataObservable2().timeout(10, TimeUnit.SECONDS),
                retrieveDataObservable3().timeout(10, TimeUnit.SECONDS),
                Function3<String, String, Boolean, Triple<String, String, Boolean>>
                { firstResult, secondResult, ThirdResult ->
                    Triple(firstResult, secondResult, ThirdResult)
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        { Log.d(TAG, "success") },
                        { throwable -> Log.d(TAG, "error", throwable) }
                )
}

private fun retrieveDataObservable1(): Observable<String> {
        return Observable.create<String> { emitter ->
            val listener = dataRef1.addSnapshotListener { snapshot, e ->
                if (e != null) {
                    emitter.onError(e)
                    return@addSnapshotListener
                }
                emitter.onNext("some value")
            }
         emitter.setCancellable { listener.remove() }
     }
}

I expect a TimeoutExceiption from Observable.combineLatest() on initial retrieval if at least one of the Observables didn't emit any item in 10 seconds. If all observables emitted data successfuly at least once then there shouldn't be TimeoutException.


Solution

  • It think this should work for you:

    import io.reactivex.Observable
    import io.reactivex.functions.Function
    import io.reactivex.functions.Function3
    import java.util.concurrent.TimeUnit
    
    Observable.combineLatest(
        firstObservable,
        secondObservable,
        thirdObservable,
        Function3<String, String, String, String> { first, second, third -> "$first$second$third" })
        .timeout<String, String>(
           Observable.empty<String>().delay(10, TimeUnit.SECONDS),
           Function { Observable.never<String>() }
        )
        .subscribe {
            println("$it")
        }
    

    The first parameter of the timeout() operator is an Observable for the first item timeout. In our case it's 10 sec. For the upcoming items it's a function that returns Observable.never() for all item so it will never trigger a timeout.