Search code examples
pythonsubprocesspython-asyncio

Watch stdout and stderr of a subprocess simultaneously


How can I watch standard output and standard error of a long-running subprocess simultaneously, processing each line as soon as it is generated by the subprocess?

I don't mind using Python3.6's async tools to make what I expect to be non-blocking async loops over each of the two streams, but that doesn't seem to solve the problem. The below code:

import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime


async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in p.stdout:
        print(datetime.now(), f.decode().strip())
    async for f in p.stderr:
        print(datetime.now(), "E:", f.decode().strip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run('''
         echo "Out 1";
         sleep 1;
         echo "Err 1" >&2;
         sleep 1;
         echo "Out 2"
    '''))
    loop.close()

outputs:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1

While I expect it to output something like:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2

Solution

  • To accomplish this, you need a function that will take two async sequences and merge them, producing the results from either one or the other, as they become available. With such a function in stock, run could look like this:

    async def run(cmd):
        p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
        async for f in merge(p.stdout, p.stderr):
            print(datetime.now(), f.decode().strip())
    

    A function like merge does not (yet) exist in the standard library, but the aiostream external library provides one. You can also write your own using an async generator and asyncio.wait():

    async def merge(*iterables):
        iter_next = {it.__aiter__(): None for it in iterables}
        while iter_next:
            for it, it_next in iter_next.items():
                if it_next is None:
                    fut = asyncio.ensure_future(it.__anext__())
                    fut._orig_iter = it
                    iter_next[it] = fut
            done, _ = await asyncio.wait(iter_next.values(),
                                         return_when=asyncio.FIRST_COMPLETED)
            for fut in done:
                iter_next[fut._orig_iter] = None
                try:
                    ret = fut.result()
                except StopAsyncIteration:
                    del iter_next[fut._orig_iter]
                    continue
                yield ret
    

    The above run will still differ from your desired output in one detail: it will not distinguish between output and error lines. But this can be easily accomplished by decorating the lines with an indicator:

    async def decorate_with(it, prefix):
        async for item in it:
            yield prefix, item
    
    async def run(cmd):
        p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
        async for is_out, line in merge(decorate_with(p.stdout, True),
                                        decorate_with(p.stderr, False)):
            if is_out:
                print(datetime.now(), line.decode().strip())
            else:
                print(datetime.now(), "E:", line.decode().strip())