Search code examples
javaandroidrx-javarx-java2rx-java3

Problems with RXJava


I'm adapting some sample code from what3words for accessing their API via their Java SDK. It uses RXJava.

The sample code is:

Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
        });

First of all. this gives a deprecation warning when building and a IDE warning (Result of 'Observable.subscribe()' is ignored).

To resolve this first issue I have added Disposable myDisposable = in front of the Observable. Is this correct? (See below for where it is added)

Next I need to add a timeout so that I can show a warning etc if the request times out. To do this I have added .timeout(5000, TimeUnit.MILLISECONDS) to the builder.

This works, but the way timeouts seem to work on Observables is that they throw an exception and I cannot figure out how to catch and handle that exception.

What I have right now is:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(5000, TimeUnit.MILLISECONDS)
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
        });

This builds and runs fine, and the API/deprecation warning is not shown, BUT when no network is available this correctly times out and throws the unhandled exception.

So, the code seems to be correct, but how on earth do add the exception handling to catch the timeout TimeoutException that is thrown?

I've tried numerous things, including: adding a try-catch clause around the whole Observable - this warns that TimeoutException is not thrown by the code in the `try; and adding an error handler.

Adding the error handler has got me closest, and so the code below is as far as I have got:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(5000, TimeUnit.MILLISECONDS)
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
         }, error -> {
             runOnUiThread(new Runnable() {
                 @Override
                 public void run() {
                     myTextView.setText(R.string.network_not_available);
                 }
             });
         });

This catches the Timeout correctly and updates my UI without error, however when the network is restored it seems that the Observable might be trying to return and a null pointer exception is thrown.

(Update, this NPE might actually be being thrown sometimes after a short time whether the network is restored or not... but it is always thrown when the network restores.)

I get FATAL EXCEPTION: RxCachedThreadScheduler-1 and java.lang.NullPointerException: Callable returned a null value. Null values are generally not allowed in 3.x operators and sources.

Do I need to destroy the Observable or something to prevent the NPE?


Solution

  • You need to add an onError handler to your subscribe call:

        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
         },
         error -> {
             // handle error here
         });
    

    When a an exception makes it to a subscribe call that does not have an onError handler, it will throw a OnErrorNotImplementedException, like this:

    io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.
    

    Adding the onError handler will prevent that, and the onError handler will get called instead.