I have only this:
public Observable<List<Movie>> getAll() {
return Observable.just(Movie.class)
.flatMap(t -> Observable.just(t)
.doOnSubscribe(disposable -> realm.executeTransaction(realm1 -> realm1.where(Movie.class).findAll()))
.onErrorResumeNext((ObservableSource<? extends Class<Movie>>) observer -> Observable.empty())
.map(all -> realm.where(Movie.class).findAll())
);
}
But it looks really ugly))
Everything would be okay, if there where possibilities to avoid duplicate code and save realm.where(Movie.class).findAll() and reuse in map(). RealmResults method addAll is deprecated.
As also described here:
private io.reactivex.Flowable<List<Movie>> getAll() {
return io.reactivex.Flowable.create(new FlowableOnSubscribe<List<Movie>>() {
@Override
public void subscribe(FlowableEmitter<List<Movie>> emitter)
throws Exception {
Realm realm = Realm.getDefaultInstance();
RealmResults<Movie> results = realm.where(Movie.class).findAllAsync();
final RealmChangeListener<RealmResults<Movie>> listener = _realm -> {
if(!emitter.isUnsubscribed() && results.isLoaded()) {
emitter.onNext(results);
}
};
emitter.setDisposable(Disposables.fromRunnable(() -> {
results.removeChangeListener(listener);
realm.close();
}));
results.addChangeListener(listener);
}
}, BackpressureStrategy.LATEST)
.subscribeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(AndroidSchedulers.mainThread());
But as @masp said as a comment, you can read more about designing a reactive data layer using Realm with RxJava2 in my article about this on realm.io that was published a month ago.
So with Realm 4.0.0-RC1 and above, you can actually just do
private io.reactivex.Flowable<List<Movie>> getAll(Realm realm) {
if(realm.isAutoRefresh()) {
return realm.where(Movie.class)
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded);
} else { // for background threads
return Flowable.just(realm.where(Movie.class).findAll());
}
}