I'm having trouble trying to use a BehaviorSubject
(via RxRelay.BehaviorRelay
) to store the most recent emission from a continuous Observable
.
By 'continuous', I mean the source Observable
is designed to emit data whenever it's underlying dataset changes.
The BehaviorSubject
is subscribed to the source Observable
.
It appears that once I subscribe to the BehaviorSubject
, I only ever seem to receive the first value emitted from the source Observable
to the BehaviorSubject
. The source Observable
appears to no longer emit continuously, and in fact, ceases to emit any more items.
So, here's a somewhat contrived example:
//A Singleton
public class DataManager {
private Observable<List<Item>> itemsObservable;
//A BehaviorRelay (BehaviorSubject)
public BehaviorRelay<List<Item>> itemsRelay = BehaviorRelay.create();
private DataManager() {
//An Observable which emits when subscribed, and then subsequently when the underlying uri's data changes
itemsObservable = SqlBrite.createQuery(Uri uri, ...);
//In practice I would lazily subscribe to the relay.
itemsObservable.subscribe(itemsRelay);
}
}
Now, subscribing to the BehaviorSubject
from somewhere:
// Subscribe to the BehaviorSubject
DataManager.getInstance.itemsObservable.subscribe(items -> {
//Here, I would expect 'items' to be the most recent List<Item> emitted from the source Observable to the BehaviorSubject.
//However, it seems like it's only ever the *first* item emitted from the source Observable to the BehaviorSubject.
//It seems like the source Observable never emits to the BehaviorSubject more than once, despite the source's underlying
//dataset having changed (I am triggering this change in testing).
});
It turns out there was just a flaw in my testing. The uri backing the SqlBrite
Observable
wasn't being notified of changes, so the Observable
wasn't emitting any new values. There was also a bit of a red herring to do with the Subject
being re-subscribed in an onResume
lifecycle method.. All working as intended.