Search code examples
rx-javarx-java2

rxjava 2.2.2, Occasionally deadlock


rxjava 2.2.2, occasionally deadlock in dump threads

the code as follows, Is this a problem?

Observable.fromCallable(() -> {
    Observable
        .intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
        .flatMap(i -> Observable
        .fromCallable(() -> service.getType(type))
        .subscribeOn(Schedulers.io()),false, 5)
        .observeOn(Schedulers.io())
        .buffer(3)
        .blockingSubscribe(types -> {
            // ...
        });
        return Result.success();
    }
).retryWhen(new RetryWithDelay(5, 2000))
    .observeOn(Schedulers.io(), true)
    .subscribeOn(Schedulers.io())
    .subscribe(result -> {
        
    }, throwable -> {
        
    });

the thread dump as follows

"RxCachedThreadScheduler-5234" #10684 daemon prio=5 os_prio=0 tid=0x00007f690c02f800 nid=0x29f8 waiting on condition [0x00007f68008ce000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000c4b73b28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:56)
        at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:103)
        at io.reactivex.Observable.blockingSubscribe(Observable.java:5414)
        at com.test.Test.lambda$test$5(Test.java:221)
        at com.test.Test$$Lambda$79/1184820793.call(Unknown Source)
        at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:43)
        at io.reactivex.Observable.subscribe(Observable.java:12090)
        at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.subscribeNext(ObservableRetryWhen.java:150)
        at io.reactivex.internal.operators.observable.ObservableRetryWhen.subscribeActual(ObservableRetryWhen.java:60)

Hope to get answers, thank you very much


Solution

  • If you use blockingX methods in RxJava, those can lead to deadlocks. Given your example, there is no reason to use blockingSubscribe at all. Try this instead:

    Observable.defer(() -> {
        return Observable
            .intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
            .flatMap(i -> Observable.fromCallable(() -> service.getType(type))
                         .subscribeOn(Schedulers.io()),
                  false, 5
            )
            .observeOn(Schedulers.io())
            .buffer(3)
            .doOnNext(types -> {
                // ...
            })
            .ignoreElements()
            .andThen(Observable.just(Result.success()));
        }
    ).retryWhen(new RetryWithDelay(5, 2000))
        .observeOn(Schedulers.io(), true)
        .subscribeOn(Schedulers.io())
        .subscribe(result -> {
            
        }, throwable -> {
            
        });