I want to manage a subprocess with the subprocess module, and I need to pipe a (really) large numbers of lines to the child stdin. I'm creating the input with a generator, and passing onto the subprocess like this:
def my_gen (end): # simplified example
for i in range(0, end):
yield f"line {i}"
with subprocess.Popen(["command", "-o", "option_value"], # simplified example
stdin = subprocess.PIPE, stdout = sys.stdout, stderr = sys.stderr) as process:
for line in my_gen(1e7):
process.stdin.write(line.encode()) # This is apparently not safe
out, err = process.communicate() # out and err will be None,
# but this closes the process gracefully, which "with" does too
This results in a Broken Pipe Error, although it does't happen all the time on every machine I've tried:
Traceback (most recent call last):
File "my_script", line 170, in <module>
process.stdin.write(line.encode())
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "path/tolib/python3.8/subprocess.py", line 171, in <module>
File "path/tolib/python3.8/subprocess.py", line 914, in __exit__
self.stdin.close()
BrokenPipeError: [Errno 32] Broken pipe
So, what's the safe way to pass input line by line from a generator to a subprocess?
Edit: I've been getting suggestions about using communicate, which is of course in the docs. That answers how to communicate safely, but it doesn't accept a generator as input.
Edit2: as Booboo pointed out, the example will throw a runtime error (not the one I was finding in my code), the call to range should be range(0, int(end))
so my_gen
can accept numbers in 1e7
notation.
First of all, if you want stdout
and stderr
to not be piped, then either do not specify these arguments to the Popen
call at all or specify their values as None
, the default value if not specified (but do not specify these as sys.stdout
and sys.stderr
).
Why not? Looking at the source for the Popen.communicate
method I can see that there is special optimized code for the case where there is only one non-None argument and when that argument is the sysin argument then Popen.communicate
is implemented by simply doing a write of the past input string to the pipe and ignores any BrokenPipeError
error that might occur. But by passing the stdout and stderr arguments as you are, I suspect that communicate
is confused and is now starting threads to handle the processing and this is ultimately intermittently leading to your exception.
Now I believe that you can execute your writes without using communicate
and also ignore the BrokenPipeError
. When I tried the following code (substituting my own command being executed by Popen
that writes what is being piped in to a file and using text mode), I, in fact, did not encounter any BrokenPipeError
exceptions (nor do I expect to with the proper setting of stdout and stderr). So I can't swear to whether the output will still be correct if such an exception should occur.
As an aside, the range
built-in function does not take a float object (at least not for me), so I don't know how you are able to specify 1e7
.
I have also modified the code to add terminating newline characters at the end of each line and to process in text mode, but you should not feel constrained to do so.
import subprocess
import sys
def my_gen (end): # simplified example
for i in range(0, end):
yield f"line {i}\n"
with subprocess.Popen(["command", "-o", "option_value"], stdin=subprocess.PIPE, text=True) as process: # simplified example
for line in my_gen(10_000_000):
try:
process.stdin.write(line)
except BrokenPipeError as e:
pass
out, err = process.communicate()