I read difference between merge and concat. Concat preserve order, merge not.
But in my case, merge always preserve also order. What can be wrong ? Why my merge waits for previous tasks ? I want achieve random result but in this case I always get A1 A2 A3... B1 B2 B3...
fun doSomething() {
compositeDisposable.add(
Observable.merge(getNumber1(), getNumber2())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ item -> println(item) },
{ error -> println(error.message) }
))
}
private fun getNumber1(): Observable<String> {
return Observable.create { emitter ->
for (i in 1..10) {
emitter.onNext("ObservableA: " + i)
}
emitter.onComplete()
}
}
private fun getNumber2(): Observable<String> {
return Observable.create { emitter ->
for (i in 1..10) {
emitter.onNext("ObservableB: " + i)
}
emitter.onComplete()
}
}
Why my merge waits for previous tasks ?
This is because they’re both executed on the same thread. To both tasks work in parallel subscribeOn
should be called on each of the Observables before they’re merged:
Observable.merge(
getNumber1().subscribeOn(Schedulers.io()),
getNumber2().subscribeOn(Schedulers.io())
).observeOn(AndroidSchedulers.mainThread())
...
Also you can add Thread.sleep(100)
after emitter.onNext
for better testing