What I want to achieve: I want to poll some resources from the web by polling every 5 minutes but only when there is an observer subscribed. I use BehaviorSubject and interval observable for pooling. I managed to implement it but I'm new to Rx and I think it can be done better.
This is how i've done it:
private BehaviorSubject<String> observable;
private Subscription> subscription;
public Subscription subscribe(final Action1<String> action) {
if (observable == null) {
observable = BehaviorSubject.create();
if (subscription == null) {
Observable<String> source = Observable.interval(5, TimeUnit.MINUTES).map(new Func1<Long, String>() {
public String call(Long aLong) {
return getDataFromServer();
subscription = source.subscribe(new Action1<String>() {
public void call(String s) {
if (observable.hasObservers()) {
} else {
subscription = null;
return observable.subscribe(action);
The idea: - I have an observable source for polling and another observable to which the clients can subscribe (implemented using BehaviourSubject - so they always get the latest data). When the source observable emits something if the behavioursubject has subscribers it is passed on, otherwise nothing is passed and I unsubscribe from the source so it will stop.
What about:
Observable<String> observable = Observable.interval(0, 5, TimeUnit.SECONDS)
.doOnNext(new LoggingAction1<Long>("doOnNext"))
.flatMap(new Func1<Long, Observable<String>>() {
public Observable<String> call(Long pulse) {
return Observable.just(String.format("Request %d", pulse));
I think it does all you want your setup to do:
it does nothing.Subscriber
subscribes, the interval
is started and emits one value right away and then one every 5 seconds.Subscriber
will get the last item right away and then all following ones.interval
will be started - and therefore only one network request will be executed every 5 seconds - no matter how many Subscribers
there are.Subscribers
have unsubscribed, the interval
will stop emitting items.