I have a function which request a server, retrieves some data, process it and saves a csv file. This function should be launch 20k times. Each execution last differently: sometimes It last more than 20 minutes and other less than a second. I decided to go with multiprocessing.Pool.map
to parallelize the execution. My code looks like:
def get_data_and_process_it(filename):
print('getting', filename)
...
print(filename, 'has been process')
with Pool(8) as p:
p.map(get_data_and_process_it, long_list_of_filenames)
Looking at how prints
are generated it seems that long_list_of_filenames
it's been splited into 8 parts and assinged to each CPU
because sometimes is just get blocked in one 20 minutes execution with no other element of long_list_of_filenames
been processed in those 20 minutes. What I was expecting is map
to schedule each element in a cpu core in a FIFO style.
Is there a better approach for my case?
The map
method only returns when all operations have finished.
And printing from a pool worker is not ideal. For one thing, files like stdout
use buffering, so there might be a variable amount of time between printing a message and it actually appearing. Furthermore, since all workers inherit the same stdout
, their output would become intermeshed and possibly even garbled.
So I would suggest using imap_unordered
instead. That returns an iterator that will begin yielding results as soon as they are available. The only catch is that this returns results in the order they finish, not in the order they started.
Your worker function (get_data_and_process_it
) should return some kind of status indicator. For example a tuple of the filename and the result.
def get_data_and_process_it(filename):
...
if (error):
return (filename, f'has *failed* bacause of {reason}')
return (filename, 'has been processed')
You could then do:
with Pool(8) as p:
for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
print(fn, res)
That gives accurate information about when a job finishes, and since only the parent process writes to stdout
, there is no change of the output becoming garbled.
Additionally, I would suggest to use sys.stdout.reconfigure(line_buffering=True)
somewhere in the beginning of your program. That ensures that the stdout
stream will be flushed after every line of output.