Search code examples
pythonmultiprocessingpool

Python multiprocessing pool force distribution of process


This question comes as a result of trying to combine logging with a multiprocessing pool. Under Linux there is nothing to do; the module containing my pool worker method inherits the main app logger properties. Under Windows I have to initialize the logger in each process, which I do by running pool.map_async with the initializer method. The problem is that the method runs so quickly that it gets executed more than once in some processes and not at all in others. I can get it to work properly if I add a short time delay to the method but this seems inelegant.

Is there a way to force the pool to distribute the processes evenly?

(some background: http://plumberjack.blogspot.de/2010/09/using-logging-with-multiprocessing.html)

The code is as follows, I can't really post the whole module ;-) The call is this:

# Set up logger on Windows platforms
if os.name == 'nt':
    _ = pool.map_async(ml.worker_configurer,
                       [self._q for _ in range(mp.cpu_count())])

The function ml.worker_configurer is this:

def worker_configurer(queue, delay=True):
    h = QueueHandler(queue)
    root = logging.getLogger()
    root.addHandler(h)
    root.setLevel(logging.DEBUG)
    if delay:
        import time
        time.sleep(1.0)
    return

New worker configurer

def worker_configurer2(queue):
    root = logging.getLogger()
    if not root.handlers:
        h = QueueHandler(queue)
        root.addHandler(h)
        root.setLevel(logging.DEBUG)
    return

Solution

  • You can do something like this:

    sub_logger = None
    
    def get_logger():
        global sub_logger
        if sub_logger is None:
            # configure logger
    
        return sub_logger
    
    def worker1():
        logger = get_logger()
        # DO WORK
    
    def worker2():
        logger = get_logger()
        # DO WORK
    
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    result = pool.map_async(worker1, some_data)
    result.get()
    result = pool.map_async(worker2, some_data)
    result.get()
    # and so on and so forth
    

    Because each process has its own memory space (and thus it's own set of global variables), you can set the initial global logger to None and only configure the logger if it has not been previously configured.