Search code examples
pythonsubprocess

How to stream Python sub-process output when using 'stdin'


When using stdin=PIPE with subprocess.Popen(), is there a way to read stdout and stderr as they are streamed?

When not using stdin=PIPE I have been successfully using reading the output from stdout and stderr, as described here. However, now I need to include stdin=PIPE and it seems that the only option is to wait for the process to finish, and then use the tuple returned from p.communicate().?

The code indicated in the example below worked before using stdin=PIPE, but now fails with ValueError: I/O operation on closed file..

Can this method still be used somehow, or is the only way to use out and err from p.communicate()?

from subprocess import Popen, PIPE, CalledProcessError, TimeoutExpired


def log_subprocess_output(pipe, func=logger.info) -> None:
    '''Log subprocess output from a pipe.'''

    for line in pipe:
        func(line.decode().rstrip())


try:
    p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, text=True)
    out, err = p.communicate(input=input, timeout=10)

    # This worked before using `stdin=PIPE`.
    # Can this method still be used somehow, or is the only way to use `out` and `err` from `p.communicate()`?
    with p.stdout:
        log_subprocess_output(p.stdout)
    with p.stderr:
        log_subprocess_output(p.stderr, func=logger.error)

except TimeoutExpired as e:
    logger.error(f'My process timed out: {e}')
    p.kill()

except CalledProcessError as e:
    logger.error(f'My process failed: {e}')

except Exception as e:
    logger.error(f'An exception occurred: {e}')

Solution

  • Thanks to @Norhther for suggesting threading

    Here is a basic working example (that can probably be improved) that runs a process using p.communicate() to pass input do stdin, while a thread reads stdout and prints anything it finds.

    This example see's stderr redirected to stdout, while in reality I'll have 2x threads looking at those streams separately and logging to the relevant log level (e.g. logging.info() or logging.error()).

    from subprocess import CalledProcessError, TimeoutExpired, Popen, PIPE, STDOUT
    import threading
    
    def invoke_subporcess(args: list, input: str=None, timeout: int=300) -> int:
    
        def log_subprocess_output(stream) -> None:
            '''Log subprocess output from a pipe.'''
            
            while True:
                line = stream.readline()
                if not line:
                    break
                print(f'[INFO] {line.strip()}')
    
        try:
            # Configure the container image action.
            p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=STDOUT, text=True)
    
            # Start threads to read stdout and stderr.
            thread1 = threading.Thread(target=log_subprocess_output,
                name='read_stdout', kwargs={'stream': p.stdout})
            thread1.start()
    
            # Start the container image action.
            p.communicate(input=input, timeout=timeout)
    
            # Join the threads that read stdout and stderr.
            thread1.join(timeout=timeout)
            
            # Check the exit code and raise an error if necessary.
            if p.returncode > 0:
                raise CalledProcessError(p.returncode, ' '.join(p.args))
            
            return p.returncode
            
        except TimeoutExpired as e:
            print(f'[Error] My process timed out: {e}')
            p.kill()
            p.communicate()
    
        except CalledProcessError as e:
            print(f'[Error] My process failed: {e}')
    
        except Exception as e:
            print(f'[Error] An exception occurred: {e}')
    
    if __name__ == "__main__":
        
        password = "super-secret-password"
        url = "my-container-registry.com"
        args = ['docker', 'login', url, '--username', 'AWS', '--password-stdin']
    
        invoke_subporcess(args, input=password)