Search code examples
pythonpython-3.xsubprocesspipe

Python3: a subprocess sleeps when started by Popen


I need to run two commands in parallel. Each takes a long time to perform its processing (about 1 minute one and almost 2 minutes the other), and both produce many bytes on the stdout and stderr streams (about 300kB on stderr and several MB on stdout). And I have to capture both streams.

I used to use subprocess.run() to execute them, but that way I was serializing, and since the commands executed are singlethread I thought of parallelization with Popen.

Unfortunately, the simple way doesn't work:

class test:
    def __init__(self, param):
        cmd = "... %d" % param  # the command line parametrized with param
        self.__p = subprocess.Popen(shlex.split(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    
    def waitTerm(self, t=None):
        self.__p.wait(t)
        if self.__p.returncode != 0:
            print(self.__p.stderr.read(), file=sys.stderr)
            raise Exception('failure')
        self.o = self.__p.stdout.read()

t1 = test(1)
t8 = test(8)

t1.waitTerm()  # t1 should be longer
t8.waitTerm()

# here I can use stdout from both process
print(t1.o) # this is an example

Processes stop in sleep. I believe this is caused by filling the buffers related to the pipes.

In this case what is the smartest thing to do?


Solution

  • Threading and reading output as it is produced

    You can use threading to start two subprocesses and then read their output as it is produced.

    This helps avoid blocking the PIPEs. The subprocess.Popen.communicate() function can be used to capture stdout and stderr.

    Here's how:

    import threading
    import subprocess
    import shlex
    
    class Test:
        def __init__(self, param):
            cmd = "... %d" % param  # parametrized with param
            self.process = subprocess.Popen(shlex.split(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    
        def reader_thread(self):
            self.stdout, self.stderr = self.process.communicate()
    
        def run(self):
            thread = threading.Thread(target=self.reader_thread)
            thread.start()
    
    t1 = Test(1)
    t8 = Test(8)
    
    t1.run()
    t8.run()
    
    # You may want to wait for both threads to finish here before proceeding
    t1.join()
    t8.join()