Search code examples
androidrx-java2subject

RxJava2 - Emitting items using PublishSubject


I've a scenerio where I've

subject1: PublishSubject and subject2:BehaviorSubject.

First, I emit single item for subject1, then I emit item for subject2, but right after that I also want to emit different item to subject1.

fun emittingItems() {
    subject1.onNext(functionA1)
    subject2.onNext(functionB)
    if (something) subject1.onNext(functionA2)
}

What happens is, that I receive an item in this sequence: functionA1, functionA2, functionB.

Why do I get this behavior? How can I emit items in this sequence: functionA1, functionB,functionA2.

Subscribing to subjects:

val disposable = viewModel.subject1
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::someFunction())
disposables.add(disposable)

Solution

  • With observeOn(AndroidSchedulers.mainThread()) you schedule the propagation of events on the main thread. The scheduling itself is sequential, while each scheduled Runnable might handle more than one element added to the queue used for it.

    It's a kind of race condition which will arise for sure when calling emittingItems() on the main thread itself and could arise when calling it from any other thread.

    But since you're handling two different asynchronous streams, you cannot expect any sequential observation within the two different observers.

    You can achieve the given, by merging both sources as one stream:

    Observable.merge(subject1, subject2)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(subject);