Search code examples
androidobservablerx-javaevent-buspublishsubject

RxJava as event bus is called multiple times even when only once triggered


I am trying to implement the RxJava event bus, where my use case is to get triggered when subscribed and when the event is sent. But with my code even when I send the event once, I am receiving multiple events. It is working fine for the first time, it is behaving weirdly from the second time I login into my application. i.e; For the first time desired code implemented once, for the second time it implemented two time and so on.

public class RxBus {

    public RxBus() {
    }

    private PublishSubject<String> bus = PublishSubject.create();

    public void send(String str) {
        bus.onNext(str);
    }

    public Observable<String> toObservable() {
        return bus;
    }

}

The code to subscribe RxBus is below:

public void sendEvents(){
        rxBus.send("Trigger event");
    }

 public void startListener(){
        rxBus.toObservable().subscribe(str -> {
           //do action//This is executing multiple lines
        });
    }

In the above code, even though when the sendEvents() is executed once the line containing "do action" is executing multiple times. So, is something I am doing wrong here. When I went through some blogs they are asking to unsubscribe the subscription when we visit that screen a second time. But how can I unsubscribe from that?

Help here is greatly appreciated!


Solution

  • Easy solution is to declare a field:

    SerialDisposable busDisposable = new SerialDisposable();
    

    Modify you startListener method:

    public void startListener() {
        busDisposable.set(rxBus.toObservable().subscribe(str -> {
            // ...
        }));
    }
    

    In that way, when you add new subscription the previous one will be disposed, so you will end up with only one subcription at a time. This is good if your startListener call is not determined by the lifecycle. (Remember to call busDisposable.dispose() when you no longer want to recieve events. )

    But if you call your startListener in onResume/onStart/onCreate, you should better use Disposable instead of SerialDisposable and simply call stopListener method in onPause/onStop/onDestroy.

    public void stopListener() {
        busDisposable.dispose();
    }