rxjava 2.2.2, occasionally deadlock in dump threads
the code as follows, Is this a problem?
Observable.fromCallable(() -> {
Observable
.intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
.flatMap(i -> Observable
.fromCallable(() -> service.getType(type))
.subscribeOn(Schedulers.io()),false, 5)
.observeOn(Schedulers.io())
.buffer(3)
.blockingSubscribe(types -> {
// ...
});
return Result.success();
}
).retryWhen(new RetryWithDelay(5, 2000))
.observeOn(Schedulers.io(), true)
.subscribeOn(Schedulers.io())
.subscribe(result -> {
}, throwable -> {
});
the thread dump as follows
"RxCachedThreadScheduler-5234" #10684 daemon prio=5 os_prio=0 tid=0x00007f690c02f800 nid=0x29f8 waiting on condition [0x00007f68008ce000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000c4b73b28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:56)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:103)
at io.reactivex.Observable.blockingSubscribe(Observable.java:5414)
at com.test.Test.lambda$test$5(Test.java:221)
at com.test.Test$$Lambda$79/1184820793.call(Unknown Source)
at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:43)
at io.reactivex.Observable.subscribe(Observable.java:12090)
at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.subscribeNext(ObservableRetryWhen.java:150)
at io.reactivex.internal.operators.observable.ObservableRetryWhen.subscribeActual(ObservableRetryWhen.java:60)
Hope to get answers, thank you very much
If you use blockingX
methods in RxJava, those can lead to deadlocks. Given your example, there is no reason to use blockingSubscribe
at all. Try this instead:
Observable.defer(() -> {
return Observable
.intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
.flatMap(i -> Observable.fromCallable(() -> service.getType(type))
.subscribeOn(Schedulers.io()),
false, 5
)
.observeOn(Schedulers.io())
.buffer(3)
.doOnNext(types -> {
// ...
})
.ignoreElements()
.andThen(Observable.just(Result.success()));
}
).retryWhen(new RetryWithDelay(5, 2000))
.observeOn(Schedulers.io(), true)
.subscribeOn(Schedulers.io())
.subscribe(result -> {
}, throwable -> {
});