Search code examples
androidrx-java2

ObservableZip .subscribe() crash: null pointer exception, but what causes it?


I see a lot of these crashes in Google Play Console, but I can't see what exactly in code causes it from the stack trace. How can I find out?

java.lang.NullPointerException: 
  at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe (ObservableZip.java:110)
  at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual (ObservableZip.java:72)
  at io.reactivex.Observable.subscribe (Observable.java:12284)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run (ObservableSubscribeOn.java:96)
  at io.reactivex.Scheduler$DisposeTask.run (Scheduler.java:578)
  at io.reactivex.internal.schedulers.ScheduledRunnable.run (ScheduledRunnable.java:66)
  at io.reactivex.internal.schedulers.ScheduledRunnable.call (ScheduledRunnable.java:57)
  at java.util.concurrent.FutureTask.run (FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:301)
  at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1167)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:641)
  at java.lang.Thread.run (Thread.java:919)

I checked RxJava2 ObservableZip.java:110 and it's a call to .subscribe(). In my code I have Observable.zip() in some parts (mostly in Fragments) and it looks like this:

DisposableObserver<List<WalletBalance>> walletBalanceUpdateObserver = new DisposableObserver<List<WalletBalance>>() {
    @Override
    public void onNext(List<WalletBalance> results) {
        if(isAdded() && getActivity() != null) {
            // process results ...
        }
        this.dispose();
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() { }
};

Observable.zip(apiCalls, responses -> {
    List<WalletBalance> allWallets = new ArrayList<>();
    for (Object res : responses) {
        Response<WalletBalance> response = (Response<WalletBalance>) res;
        if (response != null && response.body() != null)
            allWallets.add(response.body());
    }

    return allWallets;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(walletBalanceUpdateObserver);

What could be causing the crash here?


Solution

  • Issue

    Most likely one of the elements from apiCalls in Observable.zip(apiCalls, responses -> { is null.

    Reproduce

    import io.reactivex.Observable;
    import org.assertj.core.api.Assertions;
    import org.junit.jupiter.api.Test;
    
    import java.util.Arrays;
    
    class So65600653 {
      @Test
      void zipTest() {
        Observable<Integer> zip =
            Observable.zip(
                Arrays.asList(Observable.just(1), null, Observable.just(3)),
                objects -> {
                  return 42;
                });
    
        Assertions.assertThatThrownBy(() -> {
            zip.test();
        }).isExactlyInstanceOf(NullPointerException.class);
      }
    }
    

    StackTrace

    java.lang.NullPointerException
        at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe(ObservableZip.java:110)
        at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual(ObservableZip.java:72)
    

    RxJava Source-Code

            for (int i = 0; i < len; i++) {
                if (cancelled) {
                    return;
                }
                sources[i].subscribe(s[i]); // Line 110
            }
    

    In this case, one could say sources is null, or sources[i] is null and calling subscribe on null will throw a NPE.

    But we can rule out, that sources is null because, it is checked above for it and given to ZipCoordinator(Observer<? super R> actual, .... The ZipCoordinator is called from

        ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
        zc.subscribe(sources, bufferSize);
    

    Which makes sure, that sources is not null.

        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new Observable[8];
            for (ObservableSource<? extends T> p : sourcesIterable) {
    

    Solution

    Filter out all null values from apiCalls before passing to Observable#zip.