Search code examples
javadisruptor-patternproject-reactorlmax

reactor lmax thread dump


Trying to understand below (partial) stack trace. Using spring's project reactor 2.0.4.

I got task scheduler threads that are supposed to create tasks and allocate tasks for worker threads. The application is hung at this point (worker threads not logging anything, threaddumps taken at different times always the same etc).

Can I state that the ring buffer is full based on the line :at reactor.jarjar.com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)

if the ring buffer is full why are my worker threads not picking up any of these tasks? Can the ringbuffer state become corrupt?

    "task-scheduler-9" prio=10 tid=0x00007f2e78aa7000 nid=0x3a7a waiting on condition [0x00007f2e651b6000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:349)
        at reactor.jarjar.com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
        at reactor.jarjar.com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
        at reactor.jarjar.com.lmax.disruptor.RingBuffer.next(RingBuffer.java:246)
        at reactor.core.dispatch.WorkQueueDispatcher.allocateTask(WorkQueueDispatcher.java:172)
        at reactor.core.dispatch.AbstractLifecycleDispatcher.dispatch(AbstractLifecycleDispatcher.java:117)
        at reactor.core.dispatch.AbstractLifecycleDispatcher.execute(AbstractLifecycleDispatcher.java:133)
        at reactor.spring.core.task.AbstractAsyncTaskExecutor.execute(AbstractAsyncTaskExecutor.java:293)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)


"pollerExecutor-19" daemon prio=10 tid=0x00007f2e78ba2000 nid=0x3a6f waiting on condition [0x00007f2e65bc0000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007016bd818> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at reactor.jarjar.com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:45)
        at reactor.jarjar.com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:55)
        at reactor.jarjar.com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)

   Locked ownable synchronizers:
        - <0x00000007011e7058> (a java.util.concurrent.ThreadPoolExecutor$Worker)

Solution

  • If you look at MultiProducerSequencer.java # 136 we see the ring buffer wrap point > a gating sequence. Disruptor: Gating Sequence says this is all about publishers overwriting events that haven't been handled. As you say, why are my worker threads not picking up any of these tasks? I think you need to take a look at your onEvent code that implements your EventHandler<T>