What I want to achieve: I want to poll some resources from the web by polling every 5 minutes but only when there is an observer subscribed. I use BehaviorSubject and interval observable for pooling. I managed to implement it but I'm new to Rx and I think it can be done better.
This is how i've done it:
private BehaviorSubject<String> observable;
private Subscription> subscription;
public Subscription subscribe(final Action1<String> action) {
if (observable == null) {
observable = BehaviorSubject.create();
}
if (subscription == null) {
Observable<String> source = Observable.interval(5, TimeUnit.MINUTES).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return getDataFromServer();
}
}).observeOn(AndroidSchedulers.mainThread());
subscription = source.subscribe(new Action1<String>() {
@Override
public void call(String s) {
if (observable.hasObservers()) {
observable.onNext(s);
} else {
subscription.unsubscribe();
subscription = null;
}
}
});
}
return observable.subscribe(action);
}
The idea: - I have an observable source for polling and another observable to which the clients can subscribe (implemented using BehaviourSubject - so they always get the latest data). When the source observable emits something if the behavioursubject has subscribers it is passed on, otherwise nothing is passed and I unsubscribe from the source so it will stop.
What about:
Observable<String> observable = Observable.interval(0, 5, TimeUnit.SECONDS)
.doOnNext(new LoggingAction1<Long>("doOnNext"))
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long pulse) {
return Observable.just(String.format("Request %d", pulse));
}
})
.replay(1)
.refCount();
I think it does all you want your setup to do:
Subscribers
it does nothing.Subscriber
subscribes, the interval
is started and emits one value right away and then one every 5 seconds.Subscriber
will get the last item right away and then all following ones.interval
will be started - and therefore only one network request will be executed every 5 seconds - no matter how many Subscribers
there are.Subscribers
have unsubscribed, the interval
will stop emitting items.