Search code examples
realmrx-javaobservablerx-java2

How to get Observable with List of objects from Realm using RxJava?


I have only this:

public Observable<List<Movie>> getAll() {
    return Observable.just(Movie.class)
            .flatMap(t -> Observable.just(t)
                    .doOnSubscribe(disposable -> realm.executeTransaction(realm1 -> realm1.where(Movie.class).findAll()))
                    .onErrorResumeNext((ObservableSource<? extends Class<Movie>>) observer -> Observable.empty())
                    .map(all -> realm.where(Movie.class).findAll())
            );
}

But it looks really ugly))

Everything would be okay, if there where possibilities to avoid duplicate code and save realm.where(Movie.class).findAll() and reuse in map(). RealmResults method addAll is deprecated.


Solution

  • As also described here:

    private io.reactivex.Flowable<List<Movie>> getAll() {
        return io.reactivex.Flowable.create(new FlowableOnSubscribe<List<Movie>>() {
            @Override
            public void subscribe(FlowableEmitter<List<Movie>> emitter)
                    throws Exception {
                Realm realm = Realm.getDefaultInstance();
                RealmResults<Movie> results = realm.where(Movie.class).findAllAsync();
                final RealmChangeListener<RealmResults<Movie>> listener = _realm -> {
                    if(!emitter.isUnsubscribed() && results.isLoaded()) {
                         emitter.onNext(results);
                    }
                };
                emitter.setDisposable(Disposables.fromRunnable(() -> {
                    results.removeChangeListener(listener);
                    realm.close();
                }));
                results.addChangeListener(listener);
            }
        }, BackpressureStrategy.LATEST)
        .subscribeOn(AndroidSchedulers.mainThread())
        .unsubscribeOn(AndroidSchedulers.mainThread());
    

    But as @masp said as a comment, you can read more about designing a reactive data layer using Realm with RxJava2 in my article about this on realm.io that was published a month ago.

    So with Realm 4.0.0-RC1 and above, you can actually just do

    private io.reactivex.Flowable<List<Movie>> getAll(Realm realm) {
        if(realm.isAutoRefresh()) {
            return realm.where(Movie.class)
                    .findAllAsync()
                    .asFlowable()
                    .filter(RealmResults::isLoaded);
        } else { // for background threads
            return Flowable.just(realm.where(Movie.class).findAll());
        }
    }