Search code examples
androidrx-javaobserver-patternobservablerx-android

RxAndroid - Using Rx EventBus properly


Im exactly having this behavior

Subscriber OnComplete called twice

(which is is anticipated as per http://reactivex.io/documentation/subject.html)

But in my scenario : it goes something like this :

I have a AudioRecordingService which displays a notification, in which I have options for the user to save or delete the on going recording, which is working perfectly. But I'm trying to get into using RxAndroid, my notification's save button would trigger..

RxEventBus.getInstance().postEvent(new RxEvents(RxEventsEnum.AUDIO_STOP_AND_SAVE));

which triggers

bindUntilActivitySpecificEvent(RxEventBus.getInstance().forEventType(RxEvents.class),ActivityEvent.DESTROY).subscribeOn(
        AndroidSchedulers.mainThread()).subscribe(new Action1<RxEvents>() {
      @Override public void call(RxEvents rxEvents) {
        onEvent(rxEvents);
      }
    });

and in my onEvent(rxEvent) based on the rxEvents object's data I appropriately save and store recording. The first time I try this, it works fine, but the subsequent times, the

@Override public void call(RxEvents rxEvents) {
            onEvent(rxEvents);
          }

is being called multiple times, like for example the second time I post an event, this callback is called twice, the third time thrice and so on... (which is actually what PublishSubject does). I don't want this behavior, I want Rx to be a able to post events and receive only the latest event that was posted and nothing else.

Here is my other relevant code

protected final <T> Observable<T> bindUntilActivitySpecificEvent(Observable<T> observable,
      ActivityEvent event) {
    return observable.compose(RxLifecycle.<T, ActivityEvent>bindUntilEvent(lifecycle(), event))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }

and my run of the mill RxEventBus class :

public class RxEventBus {

  private static final RxEventBus INSTANCE = new RxEventBus();

  public static RxEventBus getInstance() {
    return INSTANCE;
  }

  private RxEventBus() {
  }

  private final Subject<Object, Object> mBus = new SerializedSubject<>(PublishSubject.create());


  public void postEvent(Object event) {
    mBus.onNext(event);
  }

  public <T> Observable<T> forEventType(Class<T> eventType) {
    return mBus.ofType(eventType).observeOn(AndroidSchedulers.mainThread());
  }
}

What is the best approach using RxAndroid ? Please note that I am looking for RxAndroid solution only.


Solution

  • You are creating a new observable every time you trigger an event in

    RxEventBus.getInstance().forEventType(RxEvents.class)
    

    You need to cache the observables you create for each event type.