I am implementing repository pattern in RxJava using SqlBrite/SqlDelight for offline data storage and retrofit for Http requests
Here's a sample of that:
protected Observable<List<Item>> getItemsFromDb() {
return database.createQuery(tableName(), selectAllStatement())
.mapToList(cursor -> selectAllMapper().map(cursor));
public Observable<List<Item>>getItems(){
Observable<List<Item>> server = getRequest()
.doOnNext(items -> {
BriteDatabase.Transaction transaction = database.newTransaction();
for (Item item : items){
database.insert(tableName(), contentValues(item));
.flatMap(items -> getItemsFromDbById())
.delaySubscription(200, TimeUnit.MILLISECONDS);
Observable<List<Item>> db = getItemsFromDbById(id)
.filter(items -> items != null && items.size() > 0);
return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
The current implementation uses Observable.amb
to get latest of 2 streams and returns db
stream in case db
has data or server otherwise. To prevent early failure in case of no internet, server
has a delaySubscription
on it with 200ms
I tried using Observable.concat
but the SqlBrite stream never calls onComplete
so server
observable is never triggered.
I also tried Observable.combineLatest
which didn't work because it keeps waiting for server
observable to return data before emitting anything and Observable.switchOnNext
didn't work either.
What I am looking for is a repository which:
This is how you can solve the problem above, i.e., fetching data from 2 sources (local and remote) and send an update to UI only when required.
The data class wraps your data and also stores the source of data
class Data<T> {
static final int STATE_LOCAL = 0;
static final int STATE_SERVER = 1;
private T data;
private int state;
Data(T data, int state) {
this.data = data;
this.state = state;
public int getState() { return state; }
public T getData() { return data; }
public Observable<Model> getData(long id) {
// Used to cache data and compare it with server data, so we can avoid unnecessary UI updates
Subject<Data<Model>> publishSubject = BehaviorSubject.create();
publishSubject.onNext(new Data<>(null, Data.STATE_LOCAL));
Observable<Data<Model>> server = getRequest()
.map(items -> new Data<>(items, Data.STATE_SERVER))
// Here we are combining data from server and our `BehaviorSubject`
// If any one has ideas how to do this without the subject, I'll be glad to hear it.
.flatMap(items -> Observable.zip(publishSubject.take(1), Observable.just(items), Pair::new))
.flatMap(oldNewPair -> {
// Here we are comparing old and new data to see if there was any new data returned from server
Data<Model> prevData = oldNewPair.first;
Data<Model> newData = oldNewPair.second;
//Could be any condition to compare the old and new data
if (prevData.data != null && prevData.data.updated_at() == newData.data.updated_at())
return Observable.just(prevData);
return database.insert(tableName(), contentValues(newData));
return getFromDb(id)
.map(item -> new Data<>(item, Data.STATE_LOCAL))
.doOnNext(item -> {
if (item.getState() == Data.STATE_LOCAL)
.map(item -> item.data);
This solution is without using amb
and uses BehaviorSubject which solves the following problem:
No use of delaySubscription
(Earlier used to prevent early failure in case of no internet.)
Earlier, each time two calls were made to the server which is solved in this case.