Search code examples
androidretrofitrx-javareactivex

Periodically call an observable and switch to next one only when next one succeeds


I need to poll endpoint every second, currently I do it with

Observable.interval(0, 1, TimeUnit.SECONDS, ioScheduler)
    .switchMap { return pollWithRetrofit() }

It works fine except the situation when the calls start taking more than 1 second to process, so the retrofit subscription is cancelled by swithMap before I get any response. It can happen multiple times in a row, effectively leaving the client without any response from the poll calls for long duration. In this case I would like to not cancel the retrofit call until I get a response from the next call.

I know that switchMap cancels the previous subscription when the base subscription produces onNext call, currently it happens every second by the Observable.interval, so my idea is to cancel previous call only when the retrofit calls it's onNext, i.e moving the switching one step forward the reactive chain.

How do I do that? Or is there some other solution?


Solution

  • You could use onBackpressureDrop and flatMap with maxConcurrency of 1 to make sure a longer call is still allowed to succeed:

    Flowable.interval(0, 1, TimeUnit.SECONDS, ioScheduler)
    .onBackpressureDrop()
    .flatMap(v -> pollWithRetrofit(), 1);