I use ormlite and convert database data to rx Observable.
public static <T> Observable<T> createObservable(@NonNull Observable<T> observable) {
return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
}
public Observable<List<Country>> getCountriesObservable() {
return RxUtils.createObservable(new Observable<List<Country>>() {
@Override
protected void subscribeActual(Observer<? super List<Country>> observer) {
try {
List<Country> list = mCountryDao.getCountries();
observer.onNext(list == null ? Collections.<Country>emptyList() : list);
observer.onComplete();
} catch (SQLException exc) {
Log.e(TAG, exc.getMessage());
observer.onError(exc);
}
}
});
}
I have three different classes A, B, C. In this classes I create three Observers. I subscribe to observable.
observable.subscribe(observerA);
observable.subscribe(observerB);
observable.subscribe(observerC);
Afterwards I change data in class A and update database. Is there is a way to say observable repeat new data for all observers ?
Or i need to write this code again ?
observable.subscribe(observerA);
observable.subscribe(observerB);
observable.subscribe(observerC);
Disclaimer: I don't have direct experience with RxJava, only Rx.NET, RxJS and RxSwift (and I haven't done Java dev in 10+ years)
First I'd recommend using the share()
operator on the observable before subscribing. This makes sure you don't to extra work:
Observable<List<Country>> sharedObservable = observable.share();
sharedObservable.subscribe(observerA);
sharedObservable.subscribe(observerB);
sharedObservable.subscribe(observerC);
An observable is a stream of data. Right now it only contains a single value, but you can make it return multiple values. All subscribers will of course receive the new values coming in.
You'll need to notify that the data has been updated and then generate a new value when that happens. An easy way create values in an observable arbitrarily is to use a Subject
. Subjects are a bit "un-Rx-y", but for simplicity's sake, I'll use one here:
// All this inside the same class
private PublishSubject<Object> updatesSubject = new PublishSubject<Object>();
public void SignalUpdate(){
updatesSubject.onNext(new Object());
}
public Observable<List<Country>> getCountriesObservable() {
return updatesSubject
.startWith(new Object)
.flatMap(_ -> RxUtils.createObservable(new Observable<List<Country>>() {
@Override
protected void subscribeActual(Observer<? super List<Country>> observer) {
try {
List<Country> list = mCountryDao.getCountries();
observer.onNext(list == null ? Collections.<Country>emptyList() : list);
observer.onComplete();
} catch (SQLException exc) {
Log.e(TAG, exc.getMessage());
observer.onError(exc);
}
}
});
}
This will return a List<Country>
every time signalUpdate()
is called. To make sure that you'll get value when you first subscribe, we call startWith()
which will emit the given value immediately and launch the first load of the data.