Search code examples
pythonmultiprocessingpathos

Is there a way to know that a pathos/multiprocessing worker is finished?


I'd like to know when workers finish so that I can free up resources as the last action any worker. Alternatively I can also free up these resources on the main process, but I need to free these up after each worker one by one (in contrast to freeing them up once after all of the workers finish).

I'm running my workers as below, tracking progress and PIDs used:

from pathos.multiprocessing import ProcessingPool
pool = ProcessingPool(num_workers)
pool.restart(force=True)
# Loading PIDs of workers with my get_pid() function:
pids = pool.map(get_pid, xrange(num_workers))  
try:
    results = pool.amap(
        exec_func,
        exec_args,
    )
    counter = 0
    while not results.ready():
        sleep(2)
        if counter % 60 == 0:
            log.info('Waiting for children running in pool.amap() with PIDs: {}'.format(pids))
        counter += 1
    results = results.get()
    # Attempting to close pool...
    pool.close()
    # The purpose of join() is to ensure that a child process has completed 
    # before the main process does anything.
    # Attempting to join pool...
    pool.join()
except:
    # Try to terminate the pool in case some worker PIDs still run:
    cls.hard_kill_pool(pids, pool)
    raise

Because of load balancing, it is hard to know which job will be the last on a worker. Is there any way to know that some workers are already inactive?

I'm using pathos version 0.2.0.


Solution

  • I'm the pathos author. If you need to free up resources after each worker in a Pool is is done running, I'd suggest you not use a Pool. A Pool is meant to allocate resources, and keep using them until all jobs are done. What I'd suggest is to use a for loop that spawns a Process and then ensures that the spawned Process is joined when you are done with it. If you need to do this within pathos, the Process class is at the horribly named: pathos.helpers.mp.Process (or much more directly at multiprocess.Process from the multiprocess package).