I would like to know how to ignore exceptions and continue infinite stream (in my case stream of locations)?
I'm fetching current user position (using Android-ReactiveLocation) and then sending them to my API (using Retrofit).
In my case, when exception occurs during network call (e.g. timeout) onError
method is invoked and stream stops itself. How to avoid it?
Activity:
private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
.setInterval(100);
...
private void start() {
mRestService = ...;
ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
.buffer(50)
.flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
RestService:
public interface RestService {
@POST("/.../")
Observable<Response> postLocations(@Body List<Location> locations);
}
mRestService.postLocations(locations)
emit one item, then complete.
If an error occur, then it emit the error, which complete the stream.
As you call this method in a flatMap
, the error continue to your "main" stream, and then your stream stops.
What you can do is to transform your error into another item (as described here : https://stackoverflow.com/a/28971140/476690 ), but not on your main stream (as I presume you already tried) but on the mRestService.postLocations(locations)
.
This way, this call will emit an error, that will be transformed to an item/another observable and then complete. (without calling onError
).
On a consumer view, mRestService.postLocations(locations)
will emit one item, then complete, like if everything succeed.
mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
.buffer(50)
.flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();