Search code examples
androidkotlinobservablerx-java

RxJava merge preserve order without sense


I read difference between merge and concat. Concat preserve order, merge not.

But in my case, merge always preserve also order. What can be wrong ? Why my merge waits for previous tasks ? I want achieve random result but in this case I always get A1 A2 A3... B1 B2 B3...

fun doSomething() {
        compositeDisposable.add(
            Observable.merge(getNumber1(), getNumber2())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                    { item -> println(item) },
                    { error -> println(error.message) }
                ))
    }

    private fun getNumber1(): Observable<String> {
        return Observable.create { emitter ->
            for (i in 1..10) {
                emitter.onNext("ObservableA: " + i)
            }
            emitter.onComplete()
        }
    }

    private fun getNumber2(): Observable<String> {
        return Observable.create { emitter ->
            for (i in 1..10) {
                emitter.onNext("ObservableB: " + i)
            }
            emitter.onComplete()
        }
    }

Solution

  • Why my merge waits for previous tasks ?

    This is because they’re both executed on the same thread. To both tasks work in parallel subscribeOn should be called on each of the Observables before they’re merged:

    Observable.merge(
        getNumber1().subscribeOn(Schedulers.io()),
        getNumber2().subscribeOn(Schedulers.io())
    ).observeOn(AndroidSchedulers.mainThread())
        ...
    

    Also you can add Thread.sleep(100) after emitter.onNext for better testing