Search code examples
rx-javareactive-programmingrx-java2

RxJava - any problems with doing long running task in map and doOnSuccess


Consider the following

class Dummy {
    private fun firstStep(): Single<String> {
        return Single.just("dafe")
                .subscribeOn(Schedulers.io())
    }
    fun action1() {
        firstStep()
                .map {
                    mapStuff(it)
                }
                .doOnError {
                    println("${Thread.currentThread().name} it")
                }
                .doOnSuccess {
                    someVoidAction(it)
                }
                .flatMap { 
                    return@flatMap anotherStep(it)
                }
                .observeOn(Schedulers.trampoline())
                .subscribe( { result ->
                    println(result)
                }, {
                    it.printStackTrace()
                })
    }
    private fun mapStuff(it: String): Int {
        println("mapStuff on thread ${Thread.currentThread().name}")
        Thread.sleep(2000L)
        return it.length
    }
    private fun someVoidAction(i: Int) {
        println("someVoidAction on thread ${Thread.currentThread().name}")
        Thread.sleep(2000L)
    }
}

The only drawbacks I can see are:

  1. It's not very readable - looking at the chain of action, you cannot immediate tell that "mapStuff" and "doVoidAction" are actually long-running task like how you can tell for "anotherStep"
  2. In theory, one may want to use different thread for different tasks. For example, "mapStuff" may want Schedulers.computation() instead of Schedulers.io(). However, in practice, Schedulers.io() is used 99% of the time in my projects. So, do we need to care at all?
  3. Depending on needs, for rare situation, I may want to apply 2 seconds time out and "resume on error" logic on "mapStuff" or "doVoidAction". But then again, it's situational so in general case, it's ok to do "mapStuff" or "doVoidAction" in a non-reactive fashion as above right?

Are there other problems I'm not seeing?

The drawback in writing everything in RxJava fashion is that it takes a bit more work, depending on how fluent in (or obsessed with) RxJava one is.

It's quite simple for the simple dummy code here but can get a lot more complicated in real life:

fun action2() {
        firstStep()
                .flatMap {
                    mapStuffReactive(it)
                }
                .flatMap {
                    return@flatMap doStuffReactive(it)
                }
                .flatMap {
                    return@flatMap anotherStep(it)
                }
                .observeOn(Schedulers.trampoline())
                .subscribe( { result ->
                    println(result)
                }, {
                    it.printStackTrace()
                })
    }

    private fun doStuffReactive(i: Int): Single<Int> {
        return Completable.fromAction {
            println("someVoidAction $i")
        }
                .delay(2L, TimeUnit.SECONDS, Schedulers.io())
                .andThen(Single.just(i))
    }

    private fun mapStuffReactive(it: String): Single<Int> {
        return Single.just(it.length)
                .delay(2L, TimeUnit.SECONDS, Schedulers.io())
    }

Solution

  • Following the Rx Design Guidelines, you should never put code that causes side effects in a map. You should always make side-effecting code explicit.

    Code that causes side effects should only occur when creating an observable, in a subscribe, or rarely in a do operator. This will make it clear to anybody reading the code what's going on and will also make it easier to figure out what part of the chain is, or is not, testable.

    Side effects in the do operator are fine, in fact they are expected, however be sure you understand that these side effects will occur for every subscription of the Observable. IMO, if you can put the code in a subscribe, do that instead.