Search code examples
pythonmultiprocessingqueueproducer-consumer

Implementing a sorted producer/consumer queue with Multiprocessing


I have a pretty common producer/consumer scenario, with one twist.

I need to read lines of text from a multi-gigabyte input stream (which could be a file or an HTTP stream); process each line with a slow and CPU-intensive algorithm that will output a line of text for each line of input; then write the output lines to another stream. The twist is that I need to write the output lines in the same order as the input lines that produced them.

The usual approach to these scenarios is to use a multiprocessing.Pool to run the CPU-intensive algorithm, with a Queue feeding in lines (actually, batches of lines) from the reader process, and another Queue leading out of the Pool and into the writer process:

                       / [Pool] \    
  [Reader] --> InQueue --[Pool]---> OutQueue --> [Writer]
                       \ [Pool] /

But how can I make sure the output lines (or batches) are sorted in the right order ?

One simple answer is, "just write them to a temporary file, then sort the file and write it to the output". I might end up doing that, but I'd really like to start streaming output lines as soon as possible -- as opposed to waiting for the entire input stream to be processed from start to finish.

I could easily write my own implementation of multiprocessing.Queue, which will sort its items internally, using a Dictionary (or a circular-buffer List), a Lock, and two Conditions (plus maybe an integer counter). However, I'd need to get all of these objects from a Manager, and I'm afraid that using shared state like this between multiple processes would kill my performance. So, is there some appropriately Pythony way around this issue ?


Solution

  • Maybe I'm missing something, but it seems there's a basic answer to your question.

    Let's take a simple example: we just want to reverse lines from a text. Here are the lines we want to reverse:

    INPUT = ["line {}".format(i)[::-1] for i in range(30)]
    

    That is:

    ['0 enil', '1 enil', '2 enil', ..., '92 enil']
    

    And here's the way to reverse those lines:

    import time, random
    
    def process_line(line):
        time.sleep(1.5*random.random()) # simulation of an heavy computation
        return line[::-1]
    

    These lines are coming from a source:

    def source():
        for s in INPUT:
            time.sleep(0.5*random.random()) # simulation of the network latency
            yield s
    

    We can use multiprocessing to increase the speed:

    from multiprocessing import *
    
    with Pool(3) as p:
        for line in p.imap_unordered(process_line, source()):
            print(line)
    

    But our lines are not in the expected order:

    line 0
    line 2
    line 1
    line 5
    line 3
    line 4
    ...
    line 27
    line 26
    line 29
    line 28
    

    To get the line in the expected order, you can:

    1. index the lines
    2. process them and
    3. gather them in the expected order.

    First, index the lines:

    def wrapped_source():
        for i, s in enumerate(source()):
            yield i, s
    

    Second, process the line, but keep the index:

    def wrapped_process_line(args):
        i, line = args
        return i, process_line(line)
    

    Third, gather the lines in the expected order. The idea is to use a counter and a heap. The counter is the expected index of the next line.

    Take the next couple (index, processed line):

    • If the index is equal to the counter, then just yield the processed line.
    • If not, store the couple (index, processed line) in the heap.

    Then, while the smallest index in the heap is equal to the counter, pop the smallest element and yield the line.

    Loop until the source is empty, and then flush the heap.

    from heapq import *
    h = []
    
    with Pool(3) as p:
        expected_i = 0 #
        for i, line in p.imap_unordered(wrapped_process_line, wrapped_source()):
            if i == expected_i: # lucky!
                print(line)
                expected_i += 1
            else: # unlucky!
                heappush(h, (i, line)) # store the processed line
    
            while h: # look for the line "expected_i" in the heap
                i_smallest, line_smallest = h[0] # the smallest element
                if i_smallest == expected_i:
                    heappop(h)
                    print(line_smallest)
                    expected_i += 1
                else:
                    break # the line "expect_i" was not processed yet.
    
        while h: # flush the heap
            print(heappop(h)[1]) # the line
    

    Now our lines are in the expected order:

    line 0
    line 1
    line 2
    line 3
    line 4
    line 5
    ...
    line 26
    line 27
    line 28
    line 29
    

    There is no additional delay: if the next expected line has not been processed, we have to wait, but as soon as this line arrives, we yield it.

    The main drawback is that you have to handle manually (timeout, new request, ...) the potential gaps: once you indexed your lines, if you loose a line (for whatever reason), the loop will wait for this line until the source is exhausted, and only then flush the heap. In this case you are likely to run out of memory.