I have a sync generator function that I can't change (since it's in library) that itself accepts a sync iterable.
But I would like to use it from an async context, passing it an async iterable, and without blocking the event loop during iteration. How can I do this?
For example say we have my_gen
that takes an iterable of integers that is designed to work in a sync context.
sync_input_iterable = range(0, 10000)
sync_output_iterable = my_gen(input_iterable)
for v in output_iterable:
print(v)
But I would like to use it like this:
async def main():
# Just for example purposes - there is no "arange" built into Python
async def arange(start, end):
for i in range(start, end):
yield(i)
await asyncio.sleep(0)
async_input_iterable = arange(0, 10000)
async_output_iterable = # Something to to with `my_gen`
async for v in async_output_iterable:
print(v)
asyncio.run(main())
what could the # Something...
be to make this work?
The # Something to to with `my_gen`
could have two components
to_async_iter
to convert the output of the generator function to an async iterable. This works by iterating over it in a thread using asyncio.to_thread
, which avoids blocking the event loop.to_sync_iter
function to convert the input async iterable to a sync iterable to be passed to the generator function. This works by iterating over it from sync code using asyncio.run_coroutine_threadsafe
, which makes sense since this is running from a different thread because of to_async_iter
.import asyncio
def to_sync_iterable(async_iterable, loop):
done = object()
async_it = aiter(async_iterable)
while (value := asyncio.run_coroutine_threadsafe(anext(async_iterable, done), loop).result()) is not done:
yield value
async def to_async_iterable(sync_iterable):
# to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end
done = object()
it = iter(sync_iterable)
while (value := await asyncio.to_thread(next, it, done)) is not done:
yield value
Used for example as follows:
async def main():
async def arange(start, end):
for i in range(start, end):
yield(i)
await asyncio.sleep(0)
# For example purposes
def my_gen(iterable):
for a in iterable:
yield a * 2
async_input_iterable = arange(0, 10000)
sync_input_iterable = to_sync_iterable(async_input_iterable, asyncio.get_running_loop())
sync_ouput_iterable = my_gen(sync_input_iterable)
async_output_iterable = to_async_iterable(sync_ouput_iterable)
async for v in async_output_iterable:
print(v)
asyncio.run(main())
This is inspired by the code at https://github.com/uktrade/stream-zip/issues/87#issuecomment-1695123135 that does something similar to wrap stream-zip to make it work in an async context.
I can't really speak to its performance.