Search code examples
javadisruptor-patternlmax

LMAX Disruptor Dependency Graph/Gating with SequenceBarrier


Goal

I'm trying to create a dependency relationship between handlers that's somewhat circular, and I can't quite figure out how to get it right. What I want to achieve is a variation of producer -> [handlers 1-3] -> handler 4.

So, disruptor.handleEventsWith(h1, h2, h3).then(h4);. But I have the additional requirements that

  1. While handlers 1-3 do process messages in parallel, none of them begins to process the next message until they have all finished the previous message.
  2. After the first message, handlers 1-3 wait for handler 4 to have finished the most recent message before processing the next message.

The equivalent execution logic using a single event handler could be:

disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
  Arrays.asList(h1, h2, h3).parallelStream()
        .forEach(h -> h.onEvent(event, sequence, endOfBatch));
  h4.onEvent(event, sequence, endOfBatch);
});

Context

The design context is that handlers 1-3 each update their own state according to the message and that after a message is processed by each of the three they are in a consistent state. Handler 4 then runs some logic based on the state updated by handlers 1-3. So handler 4 should only see consistent states for the data structures maintained by 1-3, which means that handlers 1-3 should not process the next message until handler 4 has finished.

(Though the goal is definitely to use the Disruptor to manage the concurrency, rather than java.util.Stream.)

Not sure if it matters, but it's also the case that handler 4's logic can be broken into two parts, one requiring that none of handlers 1-3 are being updated and the next requiring only that the first part of handler 4 has finished. So handlers 1-3 can be processing a message while the second part of handler 4 is still executing.

Is there a way to accomplish this? Or maybe my design is flawed? I feel like there should be a way to do this via SequenceBarrier but I don't quite understand how to implement this custom barrier. For handlers 1-3, I think I'd like to make a barrier with the logic handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence(), but I'm not sure where to put that logic.

Thanks!


Solution

  • I would consider having the handlers be stateless, and using the messages processed by them to contain the state of your system. That way you won't need to synchronize your handlers at all.