I have a pretty common producer/consumer scenario, with one twist.
I need to read lines of text from a multi-gigabyte input stream (which could be a file or an HTTP stream); process each line with a slow and CPU-intensive algorithm that will output a line of text for each line of input; then write the output lines to another stream. The twist is that I need to write the output lines in the same order as the input lines that produced them.
The usual approach to these scenarios is to use a multiprocessing.Pool to run the CPU-intensive algorithm, with a Queue feeding in lines (actually, batches of lines) from the reader process, and another Queue leading out of the Pool and into the writer process:
/ [Pool] \
[Reader] --> InQueue --[Pool]---> OutQueue --> [Writer]
\ [Pool] /
But how can I make sure the output lines (or batches) are sorted in the right order ?
One simple answer is, "just write them to a temporary file, then sort the file and write it to the output". I might end up doing that, but I'd really like to start streaming output lines as soon as possible -- as opposed to waiting for the entire input stream to be processed from start to finish.
I could easily write my own implementation of multiprocessing.Queue, which will sort its items internally, using a Dictionary (or a circular-buffer List), a Lock, and two Conditions (plus maybe an integer counter). However, I'd need to get all of these objects from a Manager, and I'm afraid that using shared state like this between multiple processes would kill my performance. So, is there some appropriately Pythony way around this issue ?
Maybe I'm missing something, but it seems there's a basic answer to your question.
Let's take a simple example: we just want to reverse lines from a text. Here are the lines we want to reverse:
INPUT = ["line {}".format(i)[::-1] for i in range(30)]
That is:
['0 enil', '1 enil', '2 enil', ..., '92 enil']
And here's the way to reverse those lines:
import time, random
def process_line(line):
time.sleep(1.5*random.random()) # simulation of an heavy computation
return line[::-1]
These lines are coming from a source:
def source():
for s in INPUT:
time.sleep(0.5*random.random()) # simulation of the network latency
yield s
We can use multiprocessing to increase the speed:
from multiprocessing import *
with Pool(3) as p:
for line in p.imap_unordered(process_line, source()):
print(line)
But our lines are not in the expected order:
line 0
line 2
line 1
line 5
line 3
line 4
...
line 27
line 26
line 29
line 28
To get the line in the expected order, you can:
First, index the lines:
def wrapped_source():
for i, s in enumerate(source()):
yield i, s
Second, process the line, but keep the index:
def wrapped_process_line(args):
i, line = args
return i, process_line(line)
Third, gather the lines in the expected order. The idea is to use a counter and a heap. The counter is the expected index of the next line.
Take the next couple (index, processed line):
Then, while the smallest index in the heap is equal to the counter, pop the smallest element and yield the line.
Loop until the source is empty, and then flush the heap.
from heapq import *
h = []
with Pool(3) as p:
expected_i = 0 #
for i, line in p.imap_unordered(wrapped_process_line, wrapped_source()):
if i == expected_i: # lucky!
print(line)
expected_i += 1
else: # unlucky!
heappush(h, (i, line)) # store the processed line
while h: # look for the line "expected_i" in the heap
i_smallest, line_smallest = h[0] # the smallest element
if i_smallest == expected_i:
heappop(h)
print(line_smallest)
expected_i += 1
else:
break # the line "expect_i" was not processed yet.
while h: # flush the heap
print(heappop(h)[1]) # the line
Now our lines are in the expected order:
line 0
line 1
line 2
line 3
line 4
line 5
...
line 26
line 27
line 28
line 29
There is no additional delay: if the next expected line has not been processed, we have to wait, but as soon as this line arrives, we yield it.
The main drawback is that you have to handle manually (timeout, new request, ...) the potential gaps: once you indexed your lines, if you loose a line (for whatever reason), the loop will wait for this line until the source is exhausted, and only then flush the heap. In this case you are likely to run out of memory.