Search code examples
pythonpython-asynciogenerator

Convert sync generator function that takes a sync iterable to async generator function that takes an async iterable


I have a sync generator function that I can't change (since it's in library) that itself accepts a sync iterable.

But I would like to use it from an async context, passing it an async iterable, and without blocking the event loop during iteration. How can I do this?

For example say we have my_gen that takes an iterable of integers that is designed to work in a sync context.

sync_input_iterable = range(0, 10000)
sync_output_iterable = my_gen(input_iterable)
for v in output_iterable:
    print(v)

But I would like to use it like this:

async def main():
   # Just for example purposes - there is no "arange" built into Python
   async def arange(start, end):
        for i in range(start, end):
            yield(i)
            await asyncio.sleep(0)

   async_input_iterable = arange(0, 10000)
   async_output_iterable = # Something to to with `my_gen`
   async for v in async_output_iterable:
       print(v)

asyncio.run(main())

what could the # Something... be to make this work?


Solution

  • The # Something to to with `my_gen` could have two components

    1. A to_async_iter to convert the output of the generator function to an async iterable. This works by iterating over it in a thread using asyncio.to_thread, which avoids blocking the event loop.
    2. A to_sync_iter function to convert the input async iterable to a sync iterable to be passed to the generator function. This works by iterating over it from sync code using asyncio.run_coroutine_threadsafe, which makes sense since this is running from a different thread because of to_async_iter.
    import asyncio
    
    def to_sync_iterable(async_iterable, loop):
        done = object()
        async_it = aiter(async_iterable)
        while (value := asyncio.run_coroutine_threadsafe(anext(async_iterable, done), loop).result()) is not done:
            yield value
    
    async def to_async_iterable(sync_iterable):
        # to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end
        done = object()
        it = iter(sync_iterable)
        while (value := await asyncio.to_thread(next, it, done)) is not done:
            yield value
    

    Used for example as follows:

    async def main():
        async def arange(start, end):
            for i in range(start, end):
                yield(i)
                await asyncio.sleep(0)
    
        # For example purposes
        def my_gen(iterable):
            for a in iterable:
                yield a * 2
    
        async_input_iterable = arange(0, 10000)
        sync_input_iterable = to_sync_iterable(async_input_iterable, asyncio.get_running_loop())
        sync_ouput_iterable = my_gen(sync_input_iterable)
        async_output_iterable = to_async_iterable(sync_ouput_iterable)
        async for v in async_output_iterable:
            print(v)
    
    asyncio.run(main())
    

    This is inspired by the code at https://github.com/uktrade/stream-zip/issues/87#issuecomment-1695123135 that does something similar to wrap stream-zip to make it work in an async context.

    I can't really speak to its performance.