When using stdin=PIPE
with subprocess.Popen()
, is there a way to read stdout
and stderr
as they are streamed?
When not using stdin=PIPE
I have been successfully using reading the output from stdout
and stderr
, as described here. However, now I need to include stdin=PIPE
and it seems that the only option is to wait for the process to finish, and then use the tuple returned from p.communicate()
.?
The code indicated in the example below worked before using stdin=PIPE
, but now fails with ValueError: I/O operation on closed file.
.
Can this method still be used somehow, or is the only way to use out
and err
from p.communicate()
?
from subprocess import Popen, PIPE, CalledProcessError, TimeoutExpired
def log_subprocess_output(pipe, func=logger.info) -> None:
'''Log subprocess output from a pipe.'''
for line in pipe:
func(line.decode().rstrip())
try:
p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, text=True)
out, err = p.communicate(input=input, timeout=10)
# This worked before using `stdin=PIPE`.
# Can this method still be used somehow, or is the only way to use `out` and `err` from `p.communicate()`?
with p.stdout:
log_subprocess_output(p.stdout)
with p.stderr:
log_subprocess_output(p.stderr, func=logger.error)
except TimeoutExpired as e:
logger.error(f'My process timed out: {e}')
p.kill()
except CalledProcessError as e:
logger.error(f'My process failed: {e}')
except Exception as e:
logger.error(f'An exception occurred: {e}')
Thanks to @Norhther for suggesting threading
Here is a basic working example (that can probably be improved) that runs a process using p.communicate()
to pass input do stdin
, while a thread reads stdout
and prints anything it finds.
This example see's stderr
redirected to stdout
, while in reality I'll have 2x threads looking at those streams separately and logging to the relevant log level (e.g. logging.info()
or logging.error()
).
from subprocess import CalledProcessError, TimeoutExpired, Popen, PIPE, STDOUT
import threading
def invoke_subporcess(args: list, input: str=None, timeout: int=300) -> int:
def log_subprocess_output(stream) -> None:
'''Log subprocess output from a pipe.'''
while True:
line = stream.readline()
if not line:
break
print(f'[INFO] {line.strip()}')
try:
# Configure the container image action.
p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=STDOUT, text=True)
# Start threads to read stdout and stderr.
thread1 = threading.Thread(target=log_subprocess_output,
name='read_stdout', kwargs={'stream': p.stdout})
thread1.start()
# Start the container image action.
p.communicate(input=input, timeout=timeout)
# Join the threads that read stdout and stderr.
thread1.join(timeout=timeout)
# Check the exit code and raise an error if necessary.
if p.returncode > 0:
raise CalledProcessError(p.returncode, ' '.join(p.args))
return p.returncode
except TimeoutExpired as e:
print(f'[Error] My process timed out: {e}')
p.kill()
p.communicate()
except CalledProcessError as e:
print(f'[Error] My process failed: {e}')
except Exception as e:
print(f'[Error] An exception occurred: {e}')
if __name__ == "__main__":
password = "super-secret-password"
url = "my-container-registry.com"
args = ['docker', 'login', url, '--username', 'AWS', '--password-stdin']
invoke_subporcess(args, input=password)