Search code examples
pythonmultiprocessingprogress-bartqdm

Updating a shared tqdm progress bar in python multiprocessing


I want to update a progress bar from inside a spawned process as follows:

import multiprocessing as mp
import random
import time
from tqdm import tqdm

def test(queue, pbar, lock):
    while True:
        x = queue.get()
        if x is None:
            break
        for i in range(x):
            time.sleep(1)
            lock.acquire()
            pbar.update(1)
            lock.release()

queue = mp.Queue()
lock = mp.Lock()
processes = []
pbar = tqdm(total=5050)
for rank in range(4):
    p = mp.Process(target=test, args=(queue, pbar, lock))
    p.start()
    processes.append(p)
pbar.close()

for idx in range(100):
    queue.put(idx)

for _ in range(4):
    queue.put(None)  # sentinel values to signal subprocesses to exit

for p in processes:
        p.join()  # wait for all subprocesses to finish

The above gives inconsistent updates (progess goes up and down). I found this answer, but none of them work for me because I want to update the progress bar inside the test function. How can I do this?


Solution

  • I'd slightly restructure the program:

    1.) Create update_bar process that creates a progress bar and reads from another queue values and updates the bar with these values

    2.) This update process has daemon=True parameter, so it won't block upon exit

    3.) The test processes receives upon start the bar_queue and put values there if they want to update the progress bar.

    import time
    from tqdm import tqdm
    import multiprocessing as mp
    
    
    def test(queue, bar_queue):
        while True:
            x = queue.get()
            if x is None:
                break
            for _ in range(x):
                time.sleep(0.05)
                bar_queue.put_nowait(1)
    
    
    def update_bar(q):
        pbar = tqdm(total=188)
    
        while True:
            x = q.get()
            pbar.update(x)
    
    
    if __name__ == "__main__":
        queue = mp.Queue()
        bar_queue = mp.Queue()
    
        processes = [
            mp.Process(target=test, args=(queue, bar_queue)) for _ in range(4)
        ]
    
        # start update progress bar process
        # daemon= parameter is set to True so this process won't block us upon exit
        bar_process = mp.Process(target=update_bar, args=(bar_queue,), daemon=True)
        bar_process.start()
    
        for p in processes:
            p.start()
    
        for idx in range(20):
            queue.put(idx)
    
        for _ in range(4):
            queue.put(None)  # sentinel values to signal subprocesses to exit
    
        for p in processes:
            p.join()  # wait for all subprocesses to finish
    
        time.sleep(0.5)