Search code examples
pythonparallel-processingqueuesubprocessstdout

Python multiple subprocess with a pool/queue recover output as soon as one finishes and launch next job in queue


I'm currently launching a subprocess and parsing stdout on the go without waiting for it to finish to parse stdout.

for sample in all_samples:
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..

In my script I perform this action multiple times, indeed depending on the number of input samples.

Main problem here is that every subprocess is a program/tool that uses 1 CPU for 100% while it's running. And it takes sometime.. maybe 20-40 min per input.

What I would like to achieve, is to set a pool, queue (I'm not sure what's the exact terminology here) of N max subprocess job process running at same time. So I could maximize performance, and not proceed sequentially.

So an execution flow for example a max 4 jobs pool should be:

  • Launch 4 subprocess.
  • When one of jobs finishes, parse stdout and launch next.
  • Do this until all the jobs in queue are finished.

If I can achieve this I really don't know how I could identify which sample subprocess is the one that has finished. At this moment, I don't need to identify them since each subprocess runs sequentially and I parse stdout as subprocess is printing stdout.

This is really important, since I need to identify the output of each subprocess and assign it to it's corresponding input/sample.


Solution

  • ThreadPool could be a good fit for your problem, you set the number of worker threads and add jobs, and the threads will work their way through all the tasks.

    from multiprocessing.pool import ThreadPool
    import subprocess
    
    
    def work(sample):
        my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
        line = True
        while line:
            myline = my_tool_subprocess.stdout.readline()
            #here I parse stdout..
    
    
    num = None  # set to the number of workers you want (it defaults to the cpu count of your machine)
    tp = ThreadPool(num)
    for sample in all_samples:
        tp.apply_async(work, (sample,))
    
    tp.close()
    tp.join()