Search code examples
pythonpython-3.xmultiprocessingpython-3.6tqdm

Multiprocessing show multiple progress bars


For my program, I have a file that writes random integers to a .CSV file.

from __future__ import absolute_import, division, print_function
from numpy.random import randint as randrange
import os, argparse, time
from tqdm import tqdm

def write_to_csv(filename, *args, newline = True):
    write_string = ''
    for arg in args:
        if type(arg) == list:
            for i in arg:
                write_string += str(i) + ','
        else:
            write_string += str(arg) + ','
    if newline:
        write_string = write_string.rstrip(',') + '\n'
    else:
        write_string = write_string.rstrip(',')
    with open(filename+'.csv', 'a') as file:
        file.write(write_string)

def move_dir(dirname, parent = False):
    if not parent:
        dirname = str(dirname)
        exists = os.path.isfile(dirname)
        try:
            os.mkdir(dirname)
            os.chdir(dirname)
        except FileExistsError:
            os.chdir(dirname)
    else:
        os.chdir("..")

def calculate_probability(odds, exitmode = False, low_cpu = 0):
    try:
        file_count = 0
        move_dir('Probability')
        move_dir(str(odds))
        d = {}
        writelist = []
        percentlist = []
        for i in tqdm(range(odds)):
            d[str(i)] = 0
            writelist.append(f'Times {i}')
            percentlist.append(f'Percent {i}')
        while True:
            if os.path.isfile(str(file_count)+'.csv'):
                file_count += 1
            else:
                break
        filename = str(file_count)
        write_to_csv(filename, 'Number', 'Value')
        rep = 500 * odds
        if rep > 10000:
            rep = 10000
        for i in tqdm(range(rep)):
            ran = randrange(odds)
            ran = int(ran)
            d[str(ran)] += 1
            if i == 999:
                write_to_csv(filename, i, ran+1, newline = False)
            else:
                write_to_csv(filename, i, ran+1)
            if low_cpu:
                time.sleep(0.01*float(low_cpu))
        writelist2 = []
        percentlist2 = []
        for i in tqdm(range(odds)):
            val = d[str(i)]
            writelist2.append(val)
            percentlist2.append(round(((val/rep)*100), 2))
        if os.path.isfile('runs.csv'):
            write_to_csv('runs', file_count, writelist2, percentlist2)
        else:
            write_to_csv('runs', 'Run #', writelist, percentlist)
            write_to_csv('runs', file_count, writelist2, percentlist2)
        if exitmode:
            exit()
    except(KeyboardInterrupt, SystemExit):
        if exitmode:
            os.remove(str(file_count)+'.csv')
            exit()
        else:
            try:
                os.system('cls')
                print('User/program interrupted, lauching shutdown mode...')
                os.remove(str(file_count)+'.csv')
                print('Finilizaing current trial...')
                os.chdir("..")
                os.chdir("..")
            except FileNotFoundError:
                exit()
            calculate_probability(odds, exitmode = True)

I also have a repetition system to do this multiple times.

def run_tests(times, odds, low_cpu = 0, shutdown = False):
    for i in tqdm(range(times)):
        calculate_probability(odds, low_cpu = low_cpu)
        os.chdir("..")
        os.chdir("..")
    if shutdown:
        os.system('shutdown /S /F /T 0 /hybrid')

However, if I were to run like 30 trails, it would take forever. So I decided to use the multiprocessing module to speed up the process. Because each run needs to write to the same file at the end, I had to collect the data and write them after the processes ended.

def calculate_probability(odds, low_cpu = 0):
    try:
        file_count = 0
        move_dir('Probability')
        move_dir(str(odds))
        d = {}
        writelist = []
        percentlist = []
        for i in tqdm(range(odds)):
            d[str(i)] = 0
            writelist.append(f'Times {i}')
            percentlist.append(f'Percent {i}')
        while True:
            if os.path.isfile(str(file_count)+'.csv'):
                file_count += 1
            else:
                break
        filename = str(file_count)
        write_to_csv(filename, 'Number', 'Value')
        rep = 500 * odds
        if rep > 10000:
            rep = 10000
        for i in range(rep):
            ran = randrange(odds)
            ran = int(ran)
            d[str(ran)] += 1
            if i == 999:
                write_to_csv(filename, i, ran+1, newline = False)
            else:
                write_to_csv(filename, i, ran+1)
            if low_cpu:
                time.sleep(0.01*float(low_cpu))
        writelist2 = []
        percentlist2 = []
        for i in range(odds):
            val = d[str(i)]
            writelist2.append(val)
            percentlist2.append(round(((val/rep)*100), 2))
        return (writelist, percentlist, writelist2, percentlist2)
    except(KeyboardInterrupt, SystemExit):
        try:
            os.remove(str(file_count)+'.csv')
        finally:
            exit()

def worker(odds, returndict, num, low_cpu = 0):
    returndict[f'write{num}'] = calculate_probability(odds, low_cpu = low_cpu)
    os.chdir("..")
    os.chdir("..")
    os.system('cls')

def run_tests(times, odds, low_cpu = 0, shutdown = False):
    print('Starting...')
    manager = Manager()
    return_dict = manager.dict()
    job_list = []
    for i in range(times):
        p = Process(target=worker, args=(odds,return_dict,i), kwargs = {'low_cpu' : low_cpu})
        job_list.append(p)
        p.start()

    try:
        for proc in job_list:
            proc.join()
    except KeyboardInterrupt:
        print('User quit program...')
        time.sleep(5)
        for proc in job_list:
            proc.join()
        exit()
    else:
        move_dir('Probability')
        move_dir(str(odds))
        if not os.path.isfile('runs.csv'):
            write_to_csv('runs', return_dict.values()[0][0], return_dict.values()[0][1])
        for value in return_dict.values():
            write_to_csv('runs', value[2], value[3])
        print('Done!')
    finally:
        if shutdown:
            os.system('shutdown /S /F /T 0 /hybrid')

However, when I run this new code, there is one progressbar, and each process overwrites the bar, so the bar is flashing with random numbers, making the bar useful. I want to have a stack of bars, one for each process, that each update without interrupting the others. The bars do not need to be ordered; I just need to have an idea of how fast each process is doing their tasks.


Solution

  • STDOUT is just a stream, and all of your processes are attached to the same one, so there's no direct way to tell it to print the output from different processes on different lines.

    Probably the simplest way to achieve this would be to have a separate process that is responsible for aggregating the status of all the other processes and reporting the results. You can use a multiprocessing.Queue to pass data from the worker threads to the status thread, then the status thread can print the status to stdout. If you want a stack of progress bars, you'll have to get a little creative with the formatting (essentially update all the progress bars at the same time and print them in the same order so they appear to stack up).