Search code examples
androidrx-javareactive-programmingrx-android

How to handle error on multiple chained Observable in RxJava?


I am developing an Android app using Kotlin, RxJava, and Retrofit. I want to send Http Request to the server.

  1. PUT - update option of job
  2. POST - run the job

After the first request success, then I send a second request. So I used concatMap.

val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST

updateJob.concatMap { runJob }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ job ->
        Log.d(TAG, "runJob - success: $job")
    }, {
        Log.e(TAG, "runJob - failed: ${it.message}")
        it.printStackTrace()
    })

What I want is to cancel, if the first request is failed. How should I do this?

Here is a possible code. But... this code is... I think this is ugly... Is there any cool code, please?

disposable.add(
            restService.updateJob(token, job.id, options)    // PUT
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ job ->
                    Log.d(TAG, "updateJob - success")
                    restService.runJob(token, job.id)    // POST
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe({ job ->
                            Log.d(TAG, "runJob - success")
                        }, {
                            Log.e(TAG, "runJob - failed: ${it.message}")
                            it.printStackTrace()
                        })
                }, {
                    Log.e(TAG, "updateJob - failed: ${it.message}")
                    it.printStackTrace()
                })
        )

I have one more question. I have the job list. And I want to do the same thing. Even if some jobs fail, I want to continue with the next jobs. I considered "onErrorResumeNext, onErrorReturn, doOnError". But they are not the solution. How can I do this?

Observable.fromIterable(jobs)
            .concatMap { job ->
                val updateJob = restService.updateJob(token, job.id, options)
                val printJob = restService.printJob(token, job.id)

                updateJob.concatMap { printJob }
            }
            .window(1)    // I thought "window" can be the solution. But it doesn't work.
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ job ->
                Log.d(TAG, "runJobs - success")

            }, {
                Log.e(TAG, "runJobs - failed: ${it.message}")
                it.printStackTrace()
            })

Solution

  • Actually, you have already given the answer. Your first case is correct

    val updateJob = restService.updateJob(token, job.id, options) // PUT
    val runJob = restService.runJob(token, job.id) // POST
    
    updateJob.concatMap { runJob }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({ job ->
            Log.d(TAG, "runJob - success: $job")
        }, {
            Log.e(TAG, "runJob - failed: ${it.message}")
            it.printStackTrace()
        })
    

    So in this case, if updateJob request gets failed then the stream will move into error stream and runJob request will never be called.

    runJob will be called only when updateJob is successful.

    And after updateJob success if runJob fails, then also error stream will be called.

    There is no need for your second solution.

    And for your second question onErrorResumeNext should work. Return any dummy value and handle it in the onNext

    Observable.fromIterable(jobs)
                        .concatMap { job -> restService.updateJob(token, job.id, options).onErrorResumeNext(Flowable.just(/*Send any dummy value*/)) }
                        .contcatMap { job -> restService.printJob(token, job.id).onErrorResumeNext{Flowable.just(/*Send any dummy value*/)} }
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe({ job ->
                            /*If dummy value received....handle it gracefully*/
                            Log.d(TAG, "runJobs - success")
    
                        }, {
                            Log.e(TAG, "runJobs - failed: ${it.message}")
                            it.printStackTrace()
                        })