Search code examples
androidretrofitrx-javasqlbritesqldelight

Repository pattern with SqlBrite/SqlDelight(Offline database) and Retrofit(Http request)


I am implementing repository pattern in RxJava using SqlBrite/SqlDelight for offline data storage and retrofit for Http requests

Here's a sample of that:

protected Observable<List<Item>> getItemsFromDb() {
     return database.createQuery(tableName(), selectAllStatement())
             .mapToList(cursor -> selectAllMapper().map(cursor));
 }


public Observable<List<Item>>getItems(){
     Observable<List<Item>> server = getRequest()
                 .doOnNext(items -> {
                     BriteDatabase.Transaction transaction = database.newTransaction();
                     for (Item item : items){
                         database.insert(tableName(), contentValues(item));
                     }
                     transaction.markSuccessful();
                     transaction.end();
                 })
                 .flatMap(items -> getItemsFromDbById())
                 .delaySubscription(200, TimeUnit.MILLISECONDS);
         Observable<List<Item>> db = getItemsFromDbById(id)
                 .filter(items -> items != null && items.size() > 0);
     return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
 }

The current implementation uses Observable.amb to get latest of 2 streams and returns db stream in case db has data or server otherwise. To prevent early failure in case of no internet, server has a delaySubscription on it with 200ms.

I tried using Observable.concat but the SqlBrite stream never calls onComplete so server observable is never triggered.

I also tried Observable.combineLatest which didn't work because it keeps waiting for server observable to return data before emitting anything and Observable.switchOnNext didn't work either.

What I am looking for is a repository which:

  • Keeps the subscription to SqlBrite (DB) open, in case of DB updates
  • Always fetches data from server and writes it to database
  • Should not emit empty result in case there was nothing in database and network request is still going on. This, because the user should see a progress bar in the case of the first load.

Solution

  • This is how you can solve the problem above, i.e., fetching data from 2 sources (local and remote) and send an update to UI only when required.

    The data class wraps your data and also stores the source of data

    class Data<T> {
    
        static final int STATE_LOCAL = 0;
        static final int STATE_SERVER = 1;
    
        private T data;
        private int state;
    
        Data(T data, int state) {
            this.data = data;
            this.state = state;
        }
    
        public int getState() { return state; }
    
        public T getData() { return data; }
    }
    

    ...

    public Observable<Model> getData(long id) {
    
        // Used to cache data and compare it with server data, so we can avoid unnecessary UI updates
        Subject<Data<Model>> publishSubject = BehaviorSubject.create();
        publishSubject.onNext(new Data<>(null, Data.STATE_LOCAL));
    
        Observable<Data<Model>> server = getRequest()
                .map(items -> new Data<>(items, Data.STATE_SERVER))
                // Here we are combining data from server and our `BehaviorSubject`
                // If any one has ideas how to do this without the subject, I'll be glad to hear it.
                .flatMap(items -> Observable.zip(publishSubject.take(1), Observable.just(items), Pair::new))
                .flatMap(oldNewPair -> {
                    // Here we are comparing old and new data to see if there was any new data returned from server
                    Data<Model> prevData = oldNewPair.first;
                    Data<Model> newData = oldNewPair.second;
                    //Could be any condition to compare the old and new data
                    if (prevData.data != null && prevData.data.updated_at() == newData.data.updated_at()) 
                        return Observable.just(prevData);
                    else
                        return database.insert(tableName(), contentValues(newData));
    
                    return getFromDb(id)
                            .map(item -> new Data<>(item, Data.STATE_LOCAL))
                            .onErrorResumeNext(server)
                            .doOnNext(item -> {
                                publishSubject.onNext(item);
                                if (item.getState() == Data.STATE_LOCAL)
                                    server.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe();
                            })
                            .map(item -> item.data);
    }
    

    This solution is without using amb and uses BehaviorSubject which solves the following problem:

    1. No use of delaySubscription(Earlier used to prevent early failure in case of no internet.)

    2. Earlier, each time two calls were made to the server which is solved in this case.