Search code examples
pythonparallel-processingmultiprocessingpython-multiprocessingprocess-pool

multiprocessing.Pool: How to start new processes as old ones finish?


I'm using multiprocessing Pool to manage tesseract processes (OCRing pages of microfilm). Very often in a Pool of say 20 tesseract processes a few pages will be more difficult to OCR, and thus these processes are taking much much longer than the other ones. In the mean time, the pool is just hanging and most of the CPUs are not being leveraged. I want these stragglers to be left to continue, but I also want to start up more processes to fill up the many other CPUs that are now lying idle while these few sticky pages are finishing up. My question: is there a way to load up new processes to leverage those idle CPUs. In other words, can the empty spots in the Pool be filled before waiting for the whole pool to complete?

I could use the async version of starmap and then load up a new pool when the current pool has gone down to a certain number of living processes. But this seems inelegant. It would be more elegant to automagically keep slotting in processes as needed.

Here's what my code looks like right now:

def getMpBatchMap(fileList, commandTemplate, concurrentProcesses):
    mpBatchMap = []
    for i in range(concurrentProcesses):
        fileName = fileList.readline()
        if fileName:
            mpBatchMap.append((fileName, commandTemplate))
    return mpBatchMap

def executeSystemProcesses(objFileName, commandTemplate):
    objFileName = objFileName.strip()
    logging.debug(objFileName)
    objDirName = os.path.dirname(objFileName)
    command = commandTemplate.substitute(objFileName=objFileName, objDirName=objDirName)
    logging.debug(command)
    subprocess.call(command, shell=True)

def process(FILE_LIST_FILENAME, commandTemplateString, concurrentProcesses=3):
    """Go through the list of files and run the provided command against them,
    one at a time. Template string maps the terms $objFileName and $objDirName.

    Example:
    >>> runBatchProcess('convert -scale 256 "$objFileName" "$objDirName/TN.jpg"')
    """
    commandTemplate = Template(commandTemplateString)
    with open(FILE_LIST_FILENAME) as fileList:
        while 1:
            # Get a batch of x files to process
            mpBatchMap = getMpBatchMap(fileList, commandTemplate, concurrentProcesses)
            # Process them
            logging.debug('Starting MP batch of %i' % len(mpBatchMap))
            if mpBatchMap:
                with Pool(concurrentProcesses) as p:
                    poolResult = p.starmap(executeSystemProcesses, mpBatchMap)
                    logging.debug('Pool result: %s' % str(poolResult))
            else:
                break

Solution

  • You're mixing something up here. The pool always keeps a number of specified processes alive. As long as you don't close the pool, either manually or by leaving the with-block of the context-manager, there is no need for you to refill the pool with processes, because they're not going anywhere.

    What you probably meant to say is 'tasks', tasks these processes can work on. A task is a per-process-chunk of the iterable you pass to the pool-methods. And yes, there's a way to use idle processes in the pool for new tasks before all previously enqueued tasks have been processed. You already picked the right tool for this, the async-versions of the pool-methods. All you have to do, is to reapply some sort of async pool-method.

    from multiprocessing import Pool
    import os
    
    def busy_foo(x):
        x = int(x)
        for _ in range(x):
            x - 1
        print(os.getpid(), ' returning: ', x)
        return x
    
    if __name__ == '__main__':
    
        arguments1 = zip([222e6, 22e6] * 2)
        arguments2 = zip([111e6, 11e6] * 2)
    
        with Pool(4) as pool:
    
            results = pool.starmap_async(busy_foo, arguments1)
            results2 = pool.starmap_async(busy_foo, arguments2)
    
            print(results.get())
            print(results2.get())
    

    Example Output:

    3182  returning:  22000000
    3185  returning:  22000000
    3185  returning:  11000000
    3182  returning:  111000000
    3182  returning:  11000000
    3185  returning:  111000000
    3181  returning:  222000000
    3184  returning:  222000000
    [222000000, 22000000, 222000000, 22000000]
    [111000000, 11000000, 111000000, 11000000]
    
    Process finished with exit code 0
    

    Note above, processes 3182 and 3185 which ended up with the easier task, immediately start with tasks from the second argument-list, without waiting for 3181 and 3184 to complete first.

    If you, for some reason, really would like to use fresh processes after some amount of processed tasks per process, there's the maxtasksperchild parameter for Pool. There you can specify after how many tasks the pool should replace the old processes with new ones. The default for this argument is None, so the Pool does not replace processes by default.