Search code examples
rx-javapollingrx-android

Smart Polling using RxAndroid


What I want to achieve: I want to poll some resources from the web by polling every 5 minutes but only when there is an observer subscribed. I use BehaviorSubject and interval observable for pooling. I managed to implement it but I'm new to Rx and I think it can be done better.

This is how i've done it:

private BehaviorSubject<String> observable;
private Subscription> subscription;

public Subscription subscribe(final Action1<String> action) {
    if (observable == null) {
        observable = BehaviorSubject.create();
    }
    if (subscription == null) {
        Observable<String> source = Observable.interval(5, TimeUnit.MINUTES).map(new Func1<Long, String>() {
            @Override
            public String call(Long aLong) {
                return getDataFromServer();
            }
        }).observeOn(AndroidSchedulers.mainThread());
        subscription = source.subscribe(new Action1<String>() {

            @Override
            public void call(String s) {
                if (observable.hasObservers()) {
                    observable.onNext(s);
                } else {
                    subscription.unsubscribe();
                    subscription = null;
                }
            }
        });
    }
    return observable.subscribe(action);
}

The idea: - I have an observable source for polling and another observable to which the clients can subscribe (implemented using BehaviourSubject - so they always get the latest data). When the source observable emits something if the behavioursubject has subscribers it is passed on, otherwise nothing is passed and I unsubscribe from the source so it will stop.


Solution

  • What about:

    Observable<String> observable = Observable.interval(0, 5, TimeUnit.SECONDS)
        .doOnNext(new LoggingAction1<Long>("doOnNext"))
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override
            public Observable<String> call(Long pulse) {
                return Observable.just(String.format("Request %d", pulse));
            }
        })
        .replay(1)
        .refCount();
    

    I think it does all you want your setup to do:

    • As long as there are no Subscribers it does nothing.
    • When the first Subscriber subscribes, the interval is started and emits one value right away and then one every 5 seconds.
    • A new Subscriber will get the last item right away and then all following ones.
    • Only one interval will be started - and therefore only one network request will be executed every 5 seconds - no matter how many Subscribers there are.
    • When all Subscribers have unsubscribed, the interval will stop emitting items.