Search code examples
python-3.xpython-multiprocessing

Python multiprocessing pool: How to get all processes running currently


Using Python 3's multiprocessing Pool, how can one get the jobs that are currently executed by the worker processes. In the following simplified example, I define the variable x also as the ID of a particular process, which is what I'm looking for.

from multiprocessing import Pool

done_tasks = []

def f(x):
    return x*x

def on_task_done(x):
    done_tasks.append(x)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    ids = [0, 1, 2, 3, 4, ...]
    for id in ids:
        pool.apply_async(start_worker, id, callback=on_task_done)

    # wait and print currently running processes
    while True:
        # TODO print currently running processes

        if len(done_tasks) >= len(ids):
            break

Specifically, how can I find the IDs of processes that are currently being executed by the worker processes at any given point in time (e.g., while waiting in a loop until completion of all jobs submitted to the pool)?


Solution

  • You can use a Manager instance to create a dict which can be used to store IDs of currently running jobs. Here is how you can do this:

    import uuid
    import time
    import random
    from multiprocessing import Pool, Manager
    
    
    done_tasks = []
    
    
    def square(x, procs):
        pid = uuid.uuid4().hex
        procs[pid] = True
    
        # Simulate a long-running task:
        time.sleep(random.randint(1, 3))
    
        procs[pid] = False
    
        return x * x
    
    
    def on_task_done(results):
        done_tasks.append(results)
    
    
    def main():
        m = Manager()
        procs = m.dict()
        pool = Pool(processes=2)
    
        for x in range(10):
            pool.apply_async(square, args=(x, procs), callback=on_task_done)
    
        while len(done_tasks) < 10:
            pids = [pid for pid, running in procs.items() if running]
            print('running jobs:', pids)
            time.sleep(1)
    
        print('results:', done_tasks)
    
        pool.close()
        pool.join()
    
    
    if __name__ == '__main__':
        main()
    

    Output:

    running jobs: []
    running jobs: ['949784a09e1d4f09b647cae584e86277', 'de3c23dd3c174e9a8d7f5c2a7d89a183']
    running jobs: ['de3c23dd3c174e9a8d7f5c2a7d89a183']
    running jobs: ['47f536e9e3584e808bd7475ad43ee49a']
    running jobs: ['e06d73341d774f86a8a00d0bb2914642', '874b194b548e4f47b02859b4f704a4f2']
    running jobs: ['874b194b548e4f47b02859b4f704a4f2', 'cb95a6bcc8ff47bba271cb845a919245']
    running jobs: ['3c9fbf7e4c604dedae033b7ef93c3523', '4863c18062504a8190b415bc42874bb7']
    running jobs: ['3c9fbf7e4c604dedae033b7ef93c3523', '4863c18062504a8190b415bc42874bb7']
    running jobs: ['3c9fbf7e4c604dedae033b7ef93c3523', '4863c18062504a8190b415bc42874bb7']
    running jobs: ['2f896b86aa1647fea63c522fb2c67684', 'aba21db1b1af4fd8896a6b59534a254f']
    running jobs: ['2f896b86aa1647fea63c522fb2c67684']
    running jobs: ['2f896b86aa1647fea63c522fb2c67684']
    results: [0, 1, 4, 9, 16, 25, 36, 49, 81, 64]