Search code examples
pythonparallel-processingsubprocessmultiprocessing

Control the number of subprocesses using to call external commands in python


I understand using subprocess is the preferred way of calling external command.

But what if I want to run several commands in parall, but limit the number of processes being spawned? What bothers me is that I can't block the subprocesses. For example, if I call

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)

Then the process will continue, without waiting for cmd to finish. Therefore, I can't wrap it up in a worker of multiprocessing library.

For example, if I do:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);

pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];

then each worker will finish and return after spawning a subprocess. So I can't really limit the number of processes generated by subprocess by using Pool.

What's the proper way of limiting the number of subprocesses?


Solution

  • You can use subprocess.call if you want to wait for the command to complete. See pydoc subprocess for more information.

    You could also call the Popen.wait method in your worker:

    def worker(cmd): 
        p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
        p.wait()
    

    Because there seems to be some confusion about this answer, here's a complete example:

    import concurrent.futures
    import multiprocessing
    import random
    import subprocess
    
    
    def worker(workerid):
        print(f"start {workerid}")
        p = subprocess.Popen(["sleep", f"{random.randint(1,30)}"])
        p.wait()
        print(f"stop {workerid}")
        return workerid
    
    
    def main():
        tasks = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
            for i in range(20):
                tasks.append(pool.submit(worker, i))
    
            print("waiting for tasks...", flush=True)
            for task in concurrent.futures.as_completed(tasks):
                print(f"completed {task.result()}", flush=True)
            print("done.")
    
    
    if __name__ == "__main__":
        main()
    

    If you run the above code, you will see that all of the worker processes start in parallel and that we are able to gather values as they are completed.