Search code examples
rx-java2rx-kotlin2

How to concatEagerDelayError in RxJava2


How to implement a Observable.concatEagerDelayError or an equivalent in RxJava2/RxKotlin2 ?

There is :

  • Observable.concatEager
  • Observable.concatDelayError

But not :

  • Observable.concatEagerDelayError

What i have :

fun getAll(): Observable<List<User>> = Observable.concatArrayDelayError(
    // from db
    userDAO
        .selectAll()
        .subscribeOn(ioScheduler),
    // from api
    userAPI
        .getAll()
        .doOnNext { lstUser -> Completable.concatArray(
            userDAO.deleteAll().subscribeOn(ioScheduler),
            userDAO.save(lstUser).subscribeOn(ioScheduler)
        ) }
        .subscribeOn(ioScheduler)
)

I want same behaviour but eagerly for selectAll() and getAll() because there is no reason to wait from db to launch network call.


Solution

  • Use concatMapEagerDelayError:

     Observable.fromIterable(sources)
     .concatMapEagerDelayError(v -> v, true);
    
     Observable.fromArray(source1, source2, source3)
     .concatMapEagerDelayError(v -> v, true);
    

    JavaDoc.

    Edit:

    fun getAll(): Observable<List<User>> = Observable.fromArray(
        // from db
        userDAO
            .selectAll()
            .subscribeOn(ioScheduler),
        // from api
        userAPI
           .getAll()
           // --- this makes no sense by the way -------------------
           .doOnNext { lstUser -> Completable.concatArray(
                userDAO.deleteAll().subscribeOn(ioScheduler),
                userDAO.save(lstUser).subscribeOn(ioScheduler)
           )}
           // ------------------------------------------------------
           .subscribeOn(ioScheduler)
    )
    .concatMapEagerDelayError({ v -> v }, true)