I'm new to RxJava, and there are a lot of examples and answers out there, but I'm having trouble piecing together something that addresses all of my concerns.
The code below seems to be working except that I occasionally get results out of order. So I think I need to introduce a switchMap, but I'm not sure exactly where, and I wonder if I haven't already made things unnecessarily complicated, like is it odd to subscribe to the RxBinding and separately subscribe to the network call?
view.autocompleteText() is an RxBinding. I want to get text from that, call the retrofit service, and update the view with results, with all the bells and whistles like showing/hiding spinner, and error handling for the network call that won't kill the whole thing and make my textbox unresponsive.
private void handleAutocompleteText() {
mCompositeDisposable.add(view.autocompleteText()
.debounce(400, TimeUnit.MILLISECONDS)
.filter(s -> s.length() >= resources.getAutocompleteThreshold())
.observeOn(threads.main())
.doOnNext(s -> view.setProgressVisible(true))
.subscribe(s -> {
mCompositeDisposable.add(mAutocompleteService.query(s.toString())
.subscribeOn(threads.io())
.observeOn(threads.main()).toObservable()
.retry(3)
.doOnNext(response -> {
if (response.getStatus() != 200)
throw new RuntimeException("Server error " + response.getStatus());
})
.map(response -> response.getData())
.subscribe(items -> {
view.setProgressVisible(false);
view.updateList(items);
}, error -> {
view.setProgressVisible(false);
view.showMessage(resources.getListError());
}));
}, error -> {
view.setProgressVisible(false);
view.showMessage(resources.getListError());
}
)
);
}
Is there a better way to do this, and where can I introduce a switchMap to discard any inflight searches and only update the view with results of the last one? And if the answer is drastically different, I'd really appreciate a walkthrough.
Thanks in advance!!
I am not sure this is you want. But it might give you hints and solutions.
Let's see the code. The switchMap
gets parameter from view and changes observable
to resrofit observable
. And then it returns response from API call
. After all of that you can receive the response and use the response.
If the code doesn't work let me know.
mCompositeDisposable.add(
view
.autocompleteText()
.debounce(400, TimeUnit.MILLISECONDS)
.filter({ s -> s.length() >= resources.getAutocompleteThreshold() })
.observeOn(threads.main())
.doOnNext({ s -> view.setProgressVisible(true) })
.map({ s -> s.toString())}
.switchMap({ text -> mAutocompleteService.query(text))}
.subscribeOn(threads.io())
.observeOn(threads.main())
.retry(3)
.doOnNext({ response ->
if (response.getStatus() !== 200)
throw RuntimeException("Server error " + response.getStatus())
})
.map({ response -> response.getData() })
.subscribe({ items ->
view.setProgressVisible(false)
view.updateList(items)
}, { error ->
view.setProgressVisible(false)
view.showMessage(resources.getListError())
})
)