Search code examples
pythonmultiprocessingslurm

Python multiprocessing on slurm doesn't output print() to console


I wrote some data processing script which I wanted to run on our cluster. I use slurm to run it, something like

srun -c 8 --gres="ht:ht1:8" python -m my_module...

(gres is some magic due to cluster being gpu-oriented, should not matter in this case.)

Inside my module I have this to handle multiprocessing:

with Pool(processes=num_processes) as pool:
    out = pool.map(
        partial(df_to_stats, **param_dict), frames_uri[0:max_uri_process]
    )
    pool.close()

I have fixed dict param_dict with params of function which I bake with partial into function for pool.map, and frames_uri is my array with data.

Function df_to_stats have some prints inside, but I don't see any output in console while running script. At the same time script gives correct results, function return all needed data to out variable etc.

To be more specific, prints are bootleg tqdm-like progress bar (because tqdm didn't work like I wanted, that's a whole different issue), but I tested and just print("test") doesn't work too

    with lock:
        progress[0] += 1
    bar_len = 20
    bar_fill = bar_len * progress[0] // progress[1]
    bar = (
        "█" * bar_fill
        + "_" * (bar_len - bar_fill)
        + " "
        + str(100 * progress[0] // progress[1])
        + "%"
    )
    print("\r" + bar, end="")

Solution

  • I am not familiar with slurm and whatever idiosyncrasies it might have, so I don't know how useful my "answer" will be, especially since you say that doing print("test") does not work either. So you may need someone else who is familiar with slurm to resolve your more general printing issue. But I do see one problem with your progress bar printing even assuming you can get general printing to display.

    First some observations:

    You did not post a minimal, reproducible example. Since your problem has to do with printing a progress bar, for this issue you could have used any simple worker function to demonstrate your problem and posted a complete program. You did not do this and so some guesses have to be made.

    It appears that your worker function, df_to_stats, is trying to rewrite the progress bar. If so, you are trying to print on the same line from multiple pool processes. This approach can work, but it is not the "obvious" solution (at least not to me) and I will present some alternate methods later. Regardless of who is doing the printing, the print call that creates the progress bar should specify flush=True to force the characters being printed to be output immediately. Without flush=True or the printing of a newline character, which I do not see you doing, the output will only be done when a when the program terminates. So first try adding flush=True and see if that makes a difference. The following is a minimal, reproducible example using flush=True. If your modified code still does not work, then try running the code below and if it works your problem is elsewhere (are you getting an exception prior to printing?):

    from multiprocessing import Pool, Array, Lock
    
    N_TASKS = 20
    
    lock = Lock()
    progress = Array('i', 2)
    progress[1] = N_TASKS
    
    def advance_bar():
        with lock:
            progress[0] += 1
    
        bar_len = 20
        bar_fill = bar_len * progress[0] // progress[1]
        bar = (
            "█" * bar_fill
            + "_" * (bar_len - bar_fill)
            + " "
            + str(100 * progress[0] // progress[1])
            + "%"
        )
        print("\r" + bar, end="", flush=True)  # Do flush=true
    
    def square(x):
        import time
    
        time.sleep(.2)  # Emulate doing some work
        advance_bar()
        return x * x
    
    def main():
        with Pool() as pool:
            out = pool.map(square, range(N_TASKS))
            print()  # Add newline to progress bar
            print(out)  # Print results
    
    if __name__ == '__main__':
        main()
    

    The other approach would be to have the main process do all the progress bar printing:

    from multiprocessing import Pool
    
    N_TASKS = 20
    
    progress = [0, N_TASKS]
    
    def advance_bar(result):
        progress[0] += 1
    
        bar_len = 20
        bar_fill = bar_len * progress[0] // progress[1]
        bar = (
            "█" * bar_fill
            + "_" * (bar_len - bar_fill)
            + " "
            + str(100 * progress[0] // progress[1])
            + "%"
        )
        print("\r" + bar, end="", flush=True)  # Do flush=true
    
    def square(x):
        import time
    
        time.sleep(.2)  # Emulate doing some work
        return x * x
    
    def main():
        with Pool() as pool:
            async_results = [pool.apply_async(square, args=(x,), callback=advance_bar) for x in range(N_TASKS)]
            out = [async_result.get() for async_result in async_results]
            print()  # Add newline to progress bar
            print(out)  # Print results
    
    if __name__ == '__main__':
        main()
    

    The only issue with using apply_async with a callback is that it is not very efficient when you are submitting many tasks since you do not get the automatic "chunking" that the map function provides. Instead you can use the imap_unordered method with an explicit chunksize value. But since the results are returned in the order that tasks are completed and not in the order in which tasks are submitted (this is what you want for the most accurate progress bar advancement), you will need to explicitly pass the index of the result to and from the worker function:

    from multiprocessing import Pool
    
    N_TASKS = 20
    
    progress = [0, N_TASKS]
    
    def advance_bar():
        progress[0] += 1
    
        bar_len = 20
        bar_fill = bar_len * progress[0] // progress[1]
        bar = (
            "█" * bar_fill
            + "_" * (bar_len - bar_fill)
            + " "
            + str(100 * progress[0] // progress[1])
            + "%"
        )
        print("\r" + bar, end="", flush=True)  # Do flush=true
    
    def square(tpl):
        import time
    
        idx, x = tpl  # Unpack
        time.sleep(.2)  # Emulate doing some work
        return idx, x * x  # Return the index along with the result
    
    def main():
        out = [None] * N_TASKS
        with Pool() as pool:
            # For large iterables use something other than chuksize=1:
            for idx, result in pool.imap_unordered(square, enumerate(range(N_TASKS)), chunksize=1):
                out[idx] = result
                advance_bar()
            print()  # Add newline to progress bar
            print(out)  # Print results
    
    if __name__ == '__main__':
        main()