Search code examples
subprocessgenerator

Safely pass input line by line (from generator) to subprocess' stdin on Python


I want to manage a subprocess with the subprocess module, and I need to pipe a (really) large numbers of lines to the child stdin. I'm creating the input with a generator, and passing onto the subprocess like this:

def my_gen (end): # simplified example
  for i in range(0, end):
    yield f"line {i}"

with subprocess.Popen(["command", "-o", "option_value"], # simplified example
  stdin = subprocess.PIPE, stdout = sys.stdout, stderr = sys.stderr) as process:
  for line in my_gen(1e7):
    process.stdin.write(line.encode()) # This is apparently not safe
  out, err = process.communicate() # out and err will be None, 
  # but this closes the process gracefully, which "with" does too

This results in a Broken Pipe Error, although it does't happen all the time on every machine I've tried:

Traceback (most recent call last):
  File "my_script", line 170, in <module>
    process.stdin.write(line.encode())
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "path/tolib/python3.8/subprocess.py", line 171, in <module>
  File "path/tolib/python3.8/subprocess.py", line 914, in __exit__
    self.stdin.close()
BrokenPipeError: [Errno 32] Broken pipe

So, what's the safe way to pass input line by line from a generator to a subprocess?

Edit: I've been getting suggestions about using communicate, which is of course in the docs. That answers how to communicate safely, but it doesn't accept a generator as input. Edit2: as Booboo pointed out, the example will throw a runtime error (not the one I was finding in my code), the call to range should be range(0, int(end)) so my_gen can accept numbers in 1e7 notation.


Solution

  • First of all, if you want stdout and stderr to not be piped, then either do not specify these arguments to the Popen call at all or specify their values as None, the default value if not specified (but do not specify these as sys.stdout and sys.stderr).

    Why not? Looking at the source for the Popen.communicate method I can see that there is special optimized code for the case where there is only one non-None argument and when that argument is the sysin argument then Popen.communicate is implemented by simply doing a write of the past input string to the pipe and ignores any BrokenPipeError error that might occur. But by passing the stdout and stderr arguments as you are, I suspect that communicate is confused and is now starting threads to handle the processing and this is ultimately intermittently leading to your exception.

    Now I believe that you can execute your writes without using communicate and also ignore the BrokenPipeError. When I tried the following code (substituting my own command being executed by Popen that writes what is being piped in to a file and using text mode), I, in fact, did not encounter any BrokenPipeError exceptions (nor do I expect to with the proper setting of stdout and stderr). So I can't swear to whether the output will still be correct if such an exception should occur.

    As an aside, the range built-in function does not take a float object (at least not for me), so I don't know how you are able to specify 1e7.

    I have also modified the code to add terminating newline characters at the end of each line and to process in text mode, but you should not feel constrained to do so.

    import subprocess
    import sys
    
    def my_gen (end): # simplified example
        for i in range(0, end):
            yield f"line {i}\n"
    
    with subprocess.Popen(["command", "-o", "option_value"], stdin=subprocess.PIPE, text=True) as process: # simplified example
        for line in my_gen(10_000_000):
            try:
                process.stdin.write(line)
            except BrokenPipeError as e:
                pass
        out, err = process.communicate()