Search code examples
pythonparallel-processingmultiprocessingpython-multiprocessingprocess-pool

`multiprocessing.Pool.map()` seems to schedule wrongly


I have a function which request a server, retrieves some data, process it and saves a csv file. This function should be launch 20k times. Each execution last differently: sometimes It last more than 20 minutes and other less than a second. I decided to go with multiprocessing.Pool.map to parallelize the execution. My code looks like:

def get_data_and_process_it(filename):
    print('getting', filename)
    ...
    print(filename, 'has been process')

with Pool(8) as p:
    p.map(get_data_and_process_it, long_list_of_filenames)

Looking at how prints are generated it seems that long_list_of_filenames it's been splited into 8 parts and assinged to each CPU because sometimes is just get blocked in one 20 minutes execution with no other element of long_list_of_filenames been processed in those 20 minutes. What I was expecting is map to schedule each element in a cpu core in a FIFO style.

Is there a better approach for my case?


Solution

  • The map method only returns when all operations have finished.

    And printing from a pool worker is not ideal. For one thing, files like stdout use buffering, so there might be a variable amount of time between printing a message and it actually appearing. Furthermore, since all workers inherit the same stdout, their output would become intermeshed and possibly even garbled.

    So I would suggest using imap_unordered instead. That returns an iterator that will begin yielding results as soon as they are available. The only catch is that this returns results in the order they finish, not in the order they started.

    Your worker function (get_data_and_process_it) should return some kind of status indicator. For example a tuple of the filename and the result.

    def get_data_and_process_it(filename):
        ...
        if (error):
            return (filename, f'has *failed* bacause of {reason}')
        return (filename, 'has been processed')
    

    You could then do:

    with Pool(8) as p:
       for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
           print(fn, res)
    

    That gives accurate information about when a job finishes, and since only the parent process writes to stdout, there is no change of the output becoming garbled.

    Additionally, I would suggest to use sys.stdout.reconfigure(line_buffering=True) somewhere in the beginning of your program. That ensures that the stdout stream will be flushed after every line of output.