Here is my use case:
I am developing an app that communicates with a server via a REST API and stores the received data in a SQLite database (it's using it as a cache of some sorts).
When the user opens a screen, the following has to occur:
This is very similar to the case presented here, but there is a slight difference.
Since I am using SQLBrite, the DB observables don't terminate (because there is a ContentObserver
registered there, that pushes new data down the stream), so methods like concat
, merge
, etc. won't work.
Currently, I have resolved this using the following approach:
Observable.create(subscriber -> {
dbObservable.subscribe(subscriber);
apiObservable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(
(data) -> {
try {
persistData(data);
} catch (Throwable t) {
Exceptions.throwOrReport(t, subscriber);
}
},
(throwable) -> {
Exceptions.throwOrReport(throwable, subscriber);
})
})
It seems like it's working OK, but it just doesn't seem elegant and "correct".
Can you suggest or point me to a resource that explains what's the best way to handle this situation?
The solution to your problem is actually super easy and clean if you change the way of thinking a bit. I am using the exact same data interaction (Retrofit + Sqlbrite) and this solution works perfectly.
What you have to do is to use two separate observable subscriptions, that take care of completely different processes.
Database
->
View
: This one is used to attach your View
(Activity
, Fragment
or whatever displays your data) to the persisted data in db. You subscribe to it ONCE for created View
.dbObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
displayData(data);
}, throwable -> {
handleError(throwable);
});
API
->
Database
: The other one to fetch the data from api and persist it in the db. You subscribe to it every time you want to refresh your data in the database.apiObservable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(data -> {
storeDataInDatabase(data);
}, throwable -> {
handleError(throwable);
});
EDIT:
You don't want to "transform" both observables into one, purely for the reason you've included in your question. Both observables act completely differently.
The observable
from Retrofit acts like a Single
. It does what it needs to do, and finishes (with onCompleted
).
The observable
from Sqlbrite is a typical Observable
, it will emit something every time a specific table changes. Theoretically it should finish in the future.
Ofc you can work around that difference, but it would lead you far, far away from having a clean and easily readable code.
If you really, really need to expose a single observable
, you can just hide the fact that you're actually subscribing to the observable from retrofit when subscribing to your database.
public void fetchRemoteData() {
apiObservable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(data -> {
persistData(data);
}, throwable -> {
handleError(throwable);
});
}
fetchRemoteData
on subscriptiondbObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(() -> fetchRemoteData())
.subscribe(data -> {
displayData(data);
}, throwable -> {
handleError(throwable);
});
I suggest you really think about all that. Because the fact that you're forcing yourself into the position where you need a single observable, might be restricting you quite badly. I believe that this will be the exact thing that will force you to change your concept in the future, instead of protecting you from the change itself.