In python (3.7+), I am trying to run a subprocess as a contextmanager while asychronously streaming potentially large amounts of stdout. The problem is that I can't seem to get the body of the contextmanager to run asynchronously with the stdout callback. I have tried to use threads, running the async function there, but then I could not figure out how to get the Process object back out into the contextmanager.
So the question: how can I yield the async Process object from a contextmanager in the main thread while it is running? That is, I'd like to yield the already-and-currently-running Process from open_subprocess() before it is finished running in the following code.
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
async def stream_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(
cmd,
*args,
stdout=asyncio.subprocess.PIPE)
read = read_stream(proc, proc.stdout, stdout_callback)
await asyncio.wait([read])
return proc
@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
proc_coroutine = stream_subprocess(
cmd,
*args,
stdout_callback=stdout_callback)
# The following blocks until proc has finished
# I would like to yield proc while it is running
proc = asyncio.run(proc_coroutine)
yield proc
proc.terminate()
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
# The following code only runs after proc completes
# but I would expect these print statements to
# be interleaved with the output from the subprocess
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
time.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
Asyncio provides parallel execution by virtue of suspending anything that looks like it might block. For this to work, all code must be inside callbacks or coroutines and refrain from calling blocking functions like time.sleep()
. Other than that, your code has some other issues, such that await asyncio.wait([x])
is equivalent to await x
, which means that open_subprocess
won't yield until all of stream reading is done.
The correct way to structure the code is to move the top-level code into an async def
and use an async context manager. For example:
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(
cmd, *args, stdout=asyncio.subprocess.PIPE)
asyncio.create_task(read_stream(proc, proc.stdout, stdout_callback))
yield proc
if proc.returncode is None:
proc.terminate()
await proc.wait()
async def main():
def stdout_callback(data):
print('STDOUT:', data)
async with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
await asyncio.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
asyncio.run(main())
If you insist on mixing sync and async code, you'll need to completely separate them by running the asyncio event loop in a separate thread. Then your main thread will not be able to directly access asyncio objects like proc
because they are not thread-safe. You'll need to consistently use call_soon_threadsafe
and run_coroutine_threadsafe
to communicate with the event loop.
That approach is complex and requires inter-thread communication and fiddling with event loops, so it's not something I would recommend except as a learning exercise. Not to mention that if you're using another thread, you needn't bother with asyncio at all - you could directly issue sync calls in the other thread. But having said that, here goes a possible implementation:
import asyncio
import contextlib
import concurrent.futures
import threading
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
async def stream_subprocess(cmd, *args, proc_data_future, stdout_callback=print):
try:
proc = await asyncio.create_subprocess_exec(
cmd, *args, stdout=asyncio.subprocess.PIPE)
except Exception as e:
proc_data_future.set_exception(e)
raise
proc_data_future.set_result({'proc': proc, 'pid': proc.pid})
await read_stream(proc, proc.stdout, stdout_callback)
return proc
@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
loop = asyncio.new_event_loop()
# needed to use asyncio.subprocess outside the main thread
asyncio.get_child_watcher().attach_loop(loop)
threading.Thread(target=loop.run_forever).start()
proc_data_future = concurrent.futures.Future()
loop.call_soon_threadsafe(
loop.create_task,
stream_subprocess(cmd, *args,
proc_data_future=proc_data_future,
stdout_callback=stdout_callback))
proc_data = proc_data_future.result()
yield proc_data
async def terminate(proc):
if proc.returncode is None:
proc.terminate()
await proc.wait()
asyncio.run_coroutine_threadsafe(terminate(proc_data['proc']), loop).result()
proc_data['returncode'] = proc_data['proc'].returncode
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc_data:
for i in range(2):
print(f'RUNNING SUBPROCESS {proc_data["pid"]}...')
time.sleep(1)
print(f'RETURN CODE: {proc_data["returncode"]}')