Search code examples
hazelcastvert.x

Blocked thread while using Hazelcast


I have an application using Vert.x (4.3.3), Rxjava, Hazelcast (5.1.3) and I am having troubles to use async submitToKey method. When the application runs locally (docker compose) everything is working as it should but once I deploy the application to Kubernetes, the application discover correctly all the members but then I have thread blocked warning for infinite time. You can see the log below.

From my investigation, the blocking thread comes from the submitToKey method call:

private Completable handleUpdates(String id, Analysis analysis) {
    return Completable.fromFuture(cache.submitToKey(id, new AnalysisEntryProcessor(analysis)).toCompletableFuture());
}

Thread blocked log:

{"instant":{"epochSecond":1660923533,"nanoOfSecond":335170000},"thread":"vertx-blocked-thread-checker","level":"WARN","loggerName":"io.vertx.core.impl.BlockedThreadChecker","message":"Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 10694 ms, time limit is 2000 ms","thrown":{"commonElementCount":0,"localizedMessage":"Thread blocked","message":"Thread blocked","name":"io.vertx.core.VertxException","extendedStackTrace":"io.vertx.core.VertxException: Thread blocked\n\tat jdk.internal.misc.Unsafe.park(Native Method) ~[?:?]\n\tat java.util.concurrent.locks.LockSupport.park(Unknown Source) ~[?:?]\n\tat com.hazelcast.spi.impl.AbstractInvocationFuture.manageParking(AbstractInvocationFuture.java:693) ~[app.jar:?]\n\tat com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:615) ~[app.jar:?]\n\tat com.hazelcast.spi.impl.DelegatingCompletableFuture.get(DelegatingCompletableFuture.java:109) ~[app.jar:?]\n\tat io.reactivex.internal.functions.Functions$FutureAction.run(Functions.java:161) ~[app.jar:?]\n\tat io.reactivex.internal.operators.completable.CompletableFromAction.subscribeActual(CompletableFromAction.java:35) ~[app.jar:?]\n\tat io.reactivex.Completable.subscribe(Completable.java:2309) ~[app.jar:?]\n\tat io.reactivex.internal.operators.single.SingleFlatMapCompletable$FlatMapCompletableObserver.onSuccess(SingleFlatMapCompletable.java:91) ~[app.jar:?]\n\tat io.reactivex.internal.operators.single.SingleJust.subscribeActual(SingleJust.java:30) ~[app.jar:?]\n\tat io.reactivex.Single.subscribe(Single.java:3666) ~[app.jar:?]\n\tat io.reactivex.internal.operators.single.SingleFlatMapCompletable.subscribeActual(SingleFlatMapCompletable.java:44) ~[app.jar:?]\n\tat io.reactivex.Completable.subscribe(Completable.java:2309) ~[app.jar:?]\n\tat io.reactivex.internal.operators.completable.CompletableObserveOn.subscribeActual(CompletableObserveOn.java:34) ~[app.jar:?]\n\tat io.reactivex.Completable.subscribe(Completable.java:2309) ~[app.jar:?]\n\tat io.reactivex.internal.operators.flowable.FlowableFlatMapCompletableCompletable$FlatMapCompletableMainSubscriber.onNext(FlowableFlatMapCompletableCompletable.java:130) ~[app.jar:?]\n\tat io.reactivex.internal.operators.mixed.MaybeFlatMapPublisher$FlatMapPublisherSubscriber.onNext(MaybeFlatMapPublisher.java:75) ~[app.jar:?]\n\tat io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.fastPath(FlowableFromIterable.java:178) ~[app.jar:?]\n\tat io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:122) ~[app.jar:?]\n\tat io.reactivex.internal.subscriptions.SubscriptionHelper.deferredSetOnce(SubscriptionHelper.java:202) ~[app.jar:?]\n\tat io.reactivex.internal.operators.mixed.MaybeFlatMapPublisher$FlatMapPublisherSubscriber.onSubscribe(MaybeFlatMapPublisher.java:124) ~[app.jar:?]\n\tat io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:69) ~[app.jar:?]\n\tat io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47) ~[app.jar:?]\n\tat io.reactivex.Flowable.subscribe(Flowable.java:14935) ~[app.jar:?]\n\tat io.reactivex.Flowable.subscribe(Flowable.java:14882) ~[app.jar:?]\n\tat io.reactivex.internal.operators.mixed.MaybeFlatMapPublisher$FlatMapPublisherSubscriber.onSuccess(MaybeFlatMapPublisher.java:119) ~[app.jar:?]\n\tat io.reactivex.internal.operators.maybe.MaybePeek$MaybePeekObserver.onSuccess(MaybePeek.java:122) ~[app.jar:?]\n\tat io.vertx.reactivex.impl.AsyncResultMaybe.lambda$subscribeActual$0(AsyncResultMaybe.java:50) ~[app.jar:?]\n\tat io.vertx.reactivex.impl.AsyncResultMaybe$$Lambda$2064/0x0000000840908040.handle(Unknown Source) ~[?:?]\n\tat io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141) ~[app.jar:?]\n\tat io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54) ~[app.jar:?]\n\tat io.vertx.core.impl.future.FutureBase$$Lambda$227/0x00000008402b9040.run(Unknown Source) ~[?:?]\n\tat io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[app.jar:?]\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[app.jar:?]\n\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[app.jar:?]\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) ~[app.jar:?]\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[app.jar:?]\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[app.jar:?]\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[app.jar:?]\n\tat java.lang.Thread.run(Unknown Source) ~[?:?]\n"},"endOfBatch":true,"loggerFqcn":"io.vertx.core.logging.Logger","threadId":19,"threadPriority":5}

I tried to set the Djava.util.concurrent.ForkJoinPool.common.parallelism=1, change Hazelcast version to 4.2.5, increase CPU quota but it makes no difference.

Thank you for any help


Solution

  • Finally, it seems that the problem was in the implementation + lack of resources for the application.

    Completable.fromFuture(cache.submitToKey(id, new AnalysisEntryProcessor(analysis)).toCompletableFuture());
    

    The fromFuture function calls the get() on the future which is blocking. This on a single CPU causes a deadlock and the event loop thread is never released.

    The working solution is:

    private Completable handleOnlyUpdates(String id, Analysis analysis) {
        return Completable.create(emitter ->
                cache.submitToKey(id, new AnalysisEntryProcessor(analysis))
                        .whenComplete((v, error) -> {
                            if (error != null) {
                                emitter.onError(error);
                            } else {
                                emitter.onComplete();
                            }
                        }))
                        .observeOn(RxHelper.scheduler(vertx.getOrCreateContext()));
    }