I'm working on an app that requires a conversational interaction with an external process, simulated here by the terminal calculator bc
: I send something to stdin, and I get the response via stdout, or stderr if an error occurs. However, the proc quits when a non-fatal error occurs.
Below is a snippet ztui.py
to replicate the issue. Instead of moving on to the next stdin
input, the proc
just exits unexpectedly after printing to stderr
, and the python program experiences the BrokenPipeError when it tries to send/flush the 3rd command as that is obviously closed.
from subprocess import Popen, PIPE
proc = Popen(["bc"], stdin=PIPE, stdout=PIPE, stderr=PIPE)
proc.stdin.write(b"2+2\n")
proc.stdin.flush()
print(proc.stdout.readline().decode())
proc.stdin.write(b"len()\n")
proc.stdin.flush()
print(proc.stdout.readline().decode())
print("stderr ==>", proc.stderr.readlines(2))
# and here the proc exits, checked using ps aux from another terminal window
proc.stdin.write(b"3+3\n")
proc.stdin.flush()
print(proc.stdout.readline().decode())
This is the output
fabio: ~/Downloads $ python3.11 --version
Python 3.11.3
fabio: ~/Downloads $ python3.11 ztui.py
4
stderr ==> [b'\n', b'Runtime error: undefined function: len()\n']
Traceback (most recent call last):
File "/Users/fabio/Downloads/ztui.py", line 15, in <module>
proc.stdin.flush()
BrokenPipeError: [Errno 32] Broken pipe
Needless to say, the actual bc
program does not exit when you pass len()
: it just shows the error and gets ready for the new input
fabio: ~/Downloads $ bc
>>> 2*2
4
>>> len()
Runtime error: undefined function: len()
0: (main)
>>> 3*3
9
>>> quit
How can I troubleshoot that? I had initially looked into the communicate()
command, but that is for a one-off request, and I need to continuously send and receive commands and responses.
This is really tricky to get correctly - because trying to read, even a single byte, from the wrong file will block - and there is no way to kill the blocking call.
So, I added a working example - but you may find that the pexpect
3rd party library will do what I do here, and cover other edge cases as well - you may want to check it: https://pexpect.readthedocs.io/en/stable/index.html
So, just to be clear, "bc" won't usually terminate - if it does, add the -i
switch, to force it into interactive mode. (["bc", "-i"]
args to Popen, rather than just ["bc"]
).
With that out of the way, the only thing that allows one to see if a file has pending bytes to be read and not block, is using one of the O.S. selector strategies - this is a somewhat low level API which requires some attention that even the Python wrappers for it are still a bit on the verbose side: you have to pre-register the files you want to check for pending reads, and them the return data includes all files at once, with all ready events, and their associated metadata. Cf. https://docs.python.org/3/library/selectors.html
And since, dealing with Popen's stdin, stdout and stderr I/O is a bit verbose as well, requiring encoding text, calling "flush" and so on, I think the best approach is to wrapp the Popen process in a class that can ease the communication, taking care of a proper select
call, and, since we are at it, also take care of text encoding/decoding.
I made it for your code (but I've needed this code other times as well, expect me to add this to an easier to get "gist"):
(You will note I added some input
calls so the stages
are clearly perceivable (and one can go check if the bc
process
is active). The idea is just to press <enter>
at the prompts
for each step.)
from subprocess import Popen, PIPE
import time, os
import selectors
class ProcCommunicator:
def __init__(self, cmdargs):
self.proc = proc = Popen(cmdargs, stdin=PIPE, stdout=PIPE, stderr=PIPE)
self.selector = selectors.DefaultSelector()
self.selector.register(proc.stdout.fileno(), selectors.EVENT_READ, "stdout")
self.selector.register(proc.stderr.fileno(), selectors.EVENT_READ, "stderr")
def send(self, text):
if not text.endswith("\n"):
text += "\n"
self.proc.stdin.write(text.encode("utf-8"))
self.proc.stdin.flush()
def read(self, grace_time=0.02):
raw_message = b""
streams = {"stdout": b"", "stderr": b""}
time.sleep(grace_time)
while True:
ready = self.selector.select(timeout=0)
if not ready:
# no more pending bytes to be read!
break
# unpack the objects returned by the select call:
for (fileno, fd, key_event, key_data), event in ready:
# since there _are_ bytes pending, a read on the _raw_ file will not block.
# (but reading the buffered proc.stdout Python object might block)
streams[key_data] += os.read(fileno, 1024)
for k, v in streams.items():
streams[k] = v.decode("utf-8")
return streams
proc = ProcCommunicator(["bc"])
input("step 1")
proc.send("2 + 2")
print(proc.read())
input("step 2")
proc.send("len()")
print(proc.read())
input("step 3")
proc.send("3+3")
print(proc.read())