Search code examples
pythonsubprocesspopen

subprocess.Popen: how to proper communicate in an interactive way?


I'm working on an app that requires a conversational interaction with an external process, simulated here by the terminal calculator bc: I send something to stdin, and I get the response via stdout, or stderr if an error occurs. However, the proc quits when a non-fatal error occurs.

Below is a snippet ztui.py to replicate the issue. Instead of moving on to the next stdin input, the proc just exits unexpectedly after printing to stderr, and the python program experiences the BrokenPipeError when it tries to send/flush the 3rd command as that is obviously closed.

from subprocess import Popen, PIPE

proc = Popen(["bc"], stdin=PIPE, stdout=PIPE, stderr=PIPE)

proc.stdin.write(b"2+2\n")
proc.stdin.flush()
print(proc.stdout.readline().decode())

proc.stdin.write(b"len()\n")
proc.stdin.flush()
print(proc.stdout.readline().decode())
print("stderr ==>", proc.stderr.readlines(2))

# and here the proc exits, checked using ps aux from another terminal window

proc.stdin.write(b"3+3\n")
proc.stdin.flush()
print(proc.stdout.readline().decode())

This is the output

fabio: ~/Downloads $ python3.11 --version
Python 3.11.3
fabio: ~/Downloads $ python3.11 ztui.py
4


stderr ==> [b'\n', b'Runtime error: undefined function: len()\n']
Traceback (most recent call last):
  File "/Users/fabio/Downloads/ztui.py", line 15, in <module>
    proc.stdin.flush()
BrokenPipeError: [Errno 32] Broken pipe

Needless to say, the actual bc program does not exit when you pass len(): it just shows the error and gets ready for the new input

fabio: ~/Downloads $ bc
>>> 2*2
4
>>> len()

Runtime error: undefined function: len()
    0: (main)

>>> 3*3
9
>>> quit

How can I troubleshoot that? I had initially looked into the communicate() command, but that is for a one-off request, and I need to continuously send and receive commands and responses.


Solution

  • This is really tricky to get correctly - because trying to read, even a single byte, from the wrong file will block - and there is no way to kill the blocking call.

    So, I added a working example - but you may find that the pexpect 3rd party library will do what I do here, and cover other edge cases as well - you may want to check it: https://pexpect.readthedocs.io/en/stable/index.html

    So, just to be clear, "bc" won't usually terminate - if it does, add the -i switch, to force it into interactive mode. (["bc", "-i"] args to Popen, rather than just ["bc"]).

    With that out of the way, the only thing that allows one to see if a file has pending bytes to be read and not block, is using one of the O.S. selector strategies - this is a somewhat low level API which requires some attention that even the Python wrappers for it are still a bit on the verbose side: you have to pre-register the files you want to check for pending reads, and them the return data includes all files at once, with all ready events, and their associated metadata. Cf. https://docs.python.org/3/library/selectors.html

    And since, dealing with Popen's stdin, stdout and stderr I/O is a bit verbose as well, requiring encoding text, calling "flush" and so on, I think the best approach is to wrapp the Popen process in a class that can ease the communication, taking care of a proper select call, and, since we are at it, also take care of text encoding/decoding.

    I made it for your code (but I've needed this code other times as well, expect me to add this to an easier to get "gist"):

    (You will note I added some input calls so the stages are clearly perceivable (and one can go check if the bc process is active). The idea is just to press <enter> at the prompts for each step.)

    from subprocess import Popen, PIPE
    import time, os
    import selectors
    
    
    class ProcCommunicator:
        def __init__(self, cmdargs):
            self.proc = proc = Popen(cmdargs, stdin=PIPE, stdout=PIPE, stderr=PIPE)
            self.selector = selectors.DefaultSelector()
            self.selector.register(proc.stdout.fileno(), selectors.EVENT_READ, "stdout")
            self.selector.register(proc.stderr.fileno(), selectors.EVENT_READ, "stderr")
    
        def send(self, text):
            if not text.endswith("\n"):
                text += "\n"
            self.proc.stdin.write(text.encode("utf-8"))
            self.proc.stdin.flush()
    
        def read(self, grace_time=0.02):
            raw_message = b""
            streams = {"stdout": b"", "stderr": b""}
            time.sleep(grace_time)
            while True:
                ready = self.selector.select(timeout=0)
                if not ready:
                    # no more pending bytes to be read!
                    break
                # unpack the objects returned by the select call:
                for (fileno, fd, key_event, key_data), event in ready:
                    # since there _are_ bytes pending, a read on the _raw_ file will not block. 
                    # (but reading the buffered proc.stdout Python object might block)
                    streams[key_data] += os.read(fileno, 1024)  
            for k, v in streams.items():
                streams[k] = v.decode("utf-8")
            return streams
    
    proc = ProcCommunicator(["bc"])
    
    input("step 1")
    proc.send("2 + 2")
    print(proc.read())
    
    input("step 2")
    proc.send("len()")
    print(proc.read())
    
    
    input("step 3")
    proc.send("3+3")
    print(proc.read())