Search code examples
androidrx-javareactivexbehaviorsubject

RxJava: BehaviorSubject not receiving subsequent emissions from source Observable


I'm having trouble trying to use a BehaviorSubject (via RxRelay.BehaviorRelay) to store the most recent emission from a continuous Observable.

By 'continuous', I mean the source Observable is designed to emit data whenever it's underlying dataset changes.

The BehaviorSubject is subscribed to the source Observable.

It appears that once I subscribe to the BehaviorSubject, I only ever seem to receive the first value emitted from the source Observable to the BehaviorSubject. The source Observable appears to no longer emit continuously, and in fact, ceases to emit any more items.

So, here's a somewhat contrived example:

//A Singleton
public class DataManager {

    private Observable<List<Item>> itemsObservable;

    //A BehaviorRelay (BehaviorSubject)
    public BehaviorRelay<List<Item>> itemsRelay = BehaviorRelay.create();

    private DataManager() {

        //An Observable which emits when subscribed, and then subsequently when the underlying uri's data changes
        itemsObservable = SqlBrite.createQuery(Uri uri, ...);

        //In practice I would lazily subscribe to the relay.
        itemsObservable.subscribe(itemsRelay);
    }
}

Now, subscribing to the BehaviorSubject from somewhere:

// Subscribe to the BehaviorSubject
DataManager.getInstance.itemsObservable.subscribe(items -> {
    //Here, I would expect 'items' to be the most recent List<Item> emitted from the source Observable to the BehaviorSubject.
    //However, it seems like it's only ever the *first* item emitted from the source Observable to the BehaviorSubject.
    //It seems like the source Observable never emits to the BehaviorSubject more than once, despite the source's underlying 
    //dataset having changed (I am triggering this change in testing).
});

Solution

  • It turns out there was just a flaw in my testing. The uri backing the SqlBrite Observable wasn't being notified of changes, so the Observable wasn't emitting any new values. There was also a bit of a red herring to do with the Subject being re-subscribed in an onResume lifecycle method.. All working as intended.