How can I watch standard output and standard error of a long-running subprocess simultaneously, processing each line as soon as it is generated by the subprocess?
I don't mind using Python3.6's async tools to make what I expect to be non-blocking async loops over each of the two streams, but that doesn't seem to solve the problem. The below code:
import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async for f in p.stdout:
print(datetime.now(), f.decode().strip())
async for f in p.stderr:
print(datetime.now(), "E:", f.decode().strip())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run('''
echo "Out 1";
sleep 1;
echo "Err 1" >&2;
sleep 1;
echo "Out 2"
'''))
loop.close()
outputs:
2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1
While I expect it to output something like:
2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2
To accomplish this, you need a function that will take two async sequences and merge them, producing the results from either one or the other, as they become available. With such a function in stock, run
could look like this:
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async for f in merge(p.stdout, p.stderr):
print(datetime.now(), f.decode().strip())
A function like merge
does not (yet) exist in the standard library, but the aiostream
external library provides one. You can also write your own using an async generator and asyncio.wait()
:
async def merge(*iterables):
iter_next = {it.__aiter__(): None for it in iterables}
while iter_next:
for it, it_next in iter_next.items():
if it_next is None:
fut = asyncio.ensure_future(it.__anext__())
fut._orig_iter = it
iter_next[it] = fut
done, _ = await asyncio.wait(iter_next.values(),
return_when=asyncio.FIRST_COMPLETED)
for fut in done:
iter_next[fut._orig_iter] = None
try:
ret = fut.result()
except StopAsyncIteration:
del iter_next[fut._orig_iter]
continue
yield ret
The above run
will still differ from your desired output in one detail: it will not distinguish between output and error lines. But this can be easily accomplished by decorating the lines with an indicator:
async def decorate_with(it, prefix):
async for item in it:
yield prefix, item
async def run(cmd):
p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async for is_out, line in merge(decorate_with(p.stdout, True),
decorate_with(p.stderr, False)):
if is_out:
print(datetime.now(), line.decode().strip())
else:
print(datetime.now(), "E:", line.decode().strip())