Search code examples
javaqueuereal-timemessage-queuedisruptor-pattern

How to implement a demultiplexer using the disruptor pattern?


I would like to have a ring buffer queue that will receive objects and distribute them across multiple threads from a thread pool, in a single producer to multiple consumers fashion. How do I accomplish that using the disruptor pattern? Any HelloDemux code example? Thanks!!!


Solution

  • This article details everything about demultiplexers implementing the disruptor pattern but I think a thread pool implies that you will need a dispatcher, which goes against the disruptor pattern. To implement a demux you setup a fixed number of consumer threads, not a pool, and let them grab the messages from the tail of the queue. Now, you might ask, how can they do that without a dispatcher? They simply busy spin (or use some other kind of wait strategy that is a combination of spinning, yielding, parking, sleeping, etc.) around the queue tail. Now, you might ask, how can they do that without stepping into each other? Then you have two options: you can use MODULUS (lock-free) or CAS (light lock). Each one has its own advantages and disadvantages. MODULUS is fast but can cause lane contention if one consumer falls behind. CAS is not as fast but does not cause lane contention.

    package com.coralblocks.coralqueue.sample.demux;
    
    import com.coralblocks.coralqueue.demux.CASAtomicDemux;
    import com.coralblocks.coralqueue.demux.Demux;
    
    public class Sample {
    
        private static final int NUMBER_OF_CONSUMERS = 4;
    
        public static void main(String[] args) throws InterruptedException {
    
            final Demux<StringBuilder> queue = new CASAtomicDemux<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);
    
            Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
    
            for(int i = 0; i < consumers.length; i++) {
    
                final int index = i;
    
                consumers[i] = new Thread() {
    
                    @Override
                    public void run() {
    
                        boolean running = true;
    
                        while(running) {
                            long avail;
                            while((avail = queue.availableToPoll(index)) == 0); // busy spin
                            for(int i = 0; i < avail; i++) {
                                StringBuilder sb = queue.poll(index);
    
                                if (sb == null) break; // mandatory for demuxes!
    
                                if (sb.length() == 0) {
                                    running = false;
                                    break; // exit immediately...
                                } else {
                                    System.out.println(sb.toString());
                                }
                            }
                            queue.donePolling(index);
                        }
                    }
                };
    
                consumers[i].start();
            }
    
            StringBuilder sb;
    
            for(int i = 0; i < 10; i++) {
                while((sb = queue.nextToDispatch()) == null); // busy spin
                sb.setLength(0);
                sb.append("message ").append(i);
                queue.flush();
            }
    
            // send a message to stop consumers...
            for(int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
                // because the consumer exit immediately on this message, each
                // consumer will get one of these messages and exit...
                while((sb = queue.nextToDispatch()) == null); // busy spin
                sb.setLength(0);
            }
            queue.flush(); // sent batch
    
            for(int i = 0; i < consumers.length; i++) consumers[i].join();
        }
    }