Search code examples
javarx-javaobservablefrp

RxJava Observable alternative to create in async call


I listened to this talk https://www.youtube.com/watch?v=QdmkXL7XikQ&feature=youtu.be&t=274

And eared that I should avoid creating an Observable using the create method, because it doesn't handle unsubscription and backpressure automatically, but I can't find an alternative to use in the code bellow.

compositeSubscription.add(
    Observable.create(new Observable.OnSubscribe<DTOCompaniesCallback>() {
        @Override
        public void call(final Subscriber<? super DTOCompaniesCallback> subscriber) {

            modelTrainStrike.getCompaniesFromServer(new CompaniesCallback() {
                @Override
                public void onResult(DTOCompaniesCallback dtoCompaniesCallback) {
                    try {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(dtoCompaniesCallback);
                            subscriber.onCompleted();
                        }
                    } catch (Exception e) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onError(e);
                        }
                    }
                }
            });

        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<DTOCompaniesCallback>() {
        @Override
        public void call(DTOCompaniesCallback dtoCompaniesCallback) {
            Log.i("TAG", "onResult: " + dtoCompaniesCallback.getCompaniesList().size());
        }
    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            throw new OnErrorNotImplementedException("Source!", throwable);
        }
    })
);

And I call clear the CompositeSubscription in the OnDestroy method

@Override
public void onDestroy() {
    if (compositeSubscription != null) {
        compositeSubscription.clear();
    }
}

Do you see any alternative to the create method that I could use here? Do you see any potential danger or is this approach safe? Thanks


Solution

  • You can use defer + AsyncSubject:

    Observable.defer(() -> {
        AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create();
        modelTrainStrike.getCompaniesFromServer(v -> {
            async.onNext(v);
            async.onComplete();
        });
        return async;
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    ...
    

    In case the getCompaniesFromServer supports cancellation, you can:

    Observable.defer(() -> {
        AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create();
        Closeable c = modelTrainStrike.getCompaniesFromServer(v -> {
            async.onNext(v);
            async.onComplete();
        });
        return async.doOnUnsubscribe(() -> {
            try { c.close(); } catch (IOException ex) { }
        });
    })