Search code examples
c++multithreadingdisruptor-patternlmax

In an LMAX disruptor like pattern, how do you handle a slow consumer?


I have a question on what to do in a case of a slow consumer in a lmax disruptor like ring buffer that has multi producers and a single consumer running on x86 Linux. With an lmax like ring buffer pattern you are constantly overwriting data but what if the consumer is slow. Therefore how do you handle the case where say in a 10 sized ring buffer 0-9 ring slots your consumer is on slot 5 and now your writers are ready to start writing slot 15, which is also slot 5 in the buffer ( ie: slot 5 = 15 % 10 )? What is the typical way to handle this such that writers still produce data in order it came in and clients will receive the data in the same order? That's really my question. Below are some details about my design and it works fine it's just I currently don't have a good way to handle this issue. There are multiple threads doing writes and a single thread doing reads I can't introduce multiple reader threads without changing the existing design which is beyond the current project scope currently, but interested still in your thoughts still if they involve this as a solution.

Design specifics

I have a ring buffer and the design currently has multiple producers threads and a single consumer thread. This part of the design is existing and cannot currently change . I am trying to remove the existing queue-ing system using a lock free ring buffer. What I have is as follows.

The code runs on x86 Linux, there are multiple threads running for writers and there is a single thread for the reader. The reader and writer start one slot apart and are std::atomic<uint64_t>, so the reader starts at slot 0 and writer at slot 1 then each writer will first claim a slot by first doing an atomic fetch_add(1, std::memory_order::memory_order_acq_rel) on the writer sequence by calling incrementSequence shown below and afterwards use a compare_and_swap loop to update the reader sequence to let clients know this slot is available see updateSequence .

 inline data_type incrementSequence() {                                                                                       
        return m_sequence.fetch_add(1,std::memory_order::memory_order_seq_cst);                                                  
    }   


void updateSequence(data_type aOld, data_type aNew) {                                                                        
        while ( !m_sequence.compare_exchange_weak(aOld, aNew, std::memory_order::memory_order_release, std::memory_order_relaxed)
            if  (sequence() < aNew)  {                                                                                           
                continue;                                                                                                        
            }                                                                                                                    
            break;                                                                                                               
        }                                                                                                                        
    }                   
 inline data_type sequence() const {                                                                                          
        return m_sequence.load(std::memory_order::memory_order_acquire);                                                         
    }       
      

Solution

  • A ring buffer (or a FIFO in general--doesn't have to be implemented as a ring buffer) is intended to smooth out bursts of traffic. Even though producers may produce the data in bursts, the consumers can deal with a steady flow of input.

    If you're overflowing the FIFO, it means one of two things:

    1. Your bursts are larger than you planned for. Fix this by increasing the FIFO size (or making its size dynamic).
    2. Your producers are out-running your consumers. Fix this by increasing the resources devoted to consuming the data (probably more threads).

    It sounds to me like you're currently hitting the second: your single consumer simply isn't fast enough to keep up with the producers. The only real choices in that case are to speed up consumption by either optimizing the single consumer, or adding more consumers.

    It also sounds a bit as if your consumer may be leaving their input data in the FIFO while they process the data, so that spot in the FIFO remains occupied until the consumer finishes processing that input. If so, you may be able to fix your problem by simply having the consumer remove the input data from the FIFO as soon as it starts processing. This frees up that slot so the producers can continue placing input into the buffer.

    One other point: making the FIFO size dynamic can be something of a problem. The problem is fairly simple: it can cover up the fact that you really have the second problem of not having the resources necessary to process the data on the consumer side.

    Assuming both the producers and the consumers are thread pools, the easiest way to balance the system is often to use a fixed-size FIFO. If the producers start to get so far ahead of the consumers that the FIFO overflows, then producers start to block. This lets the consumer thread pool consume more computational resources (e.g., run on more cores) to catch back up with the producers. This does, however, depend on being able to add more consumers, not restricting the system to a single consumer.