Search code examples
async-awaitpython-asyncioconcurrent.futures

Synchronous generator in asyncio


I have the following scenario:

  1. I have a blocking, synchronous generator
  2. I have an non-blocking, async function

I would like to run blocking generator (executed in a ThreadPool) and the async function on the event loop. How do I achieve this?

The following function simply prints the output from the generator, not from sleep function.

Thanks!

from concurrent.futures import ThreadPoolExecutor

import numpy as np
import asyncio
import time


def f():
    while True:
        r = np.random.randint(0, 3)
        time.sleep(r)
        yield r


async def gen():
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    gen = await loop.run_in_executor(executor, f)
    for item in gen:
        print(item)
        print('Inside generator')


async def sleep():
    while True:
        await asyncio.sleep(1)
        print('Inside async sleep')


async def combine():
    await asyncio.gather(sleep(), gen())


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(combine())


if __name__ == '__main__':
    main()

Solution

  • run_in_executor doesn't work on generators because it is designed for blocking functions. While a generator is a valid function, it returns immediately when called, providing an object that the caller is supposed to exhaust through repeated invocations of next. (This is what Python's for loop does under the hood.) To use a blocking generator from async code, you have two choices:

    • wrap each step of the iteration (each individual call to next) in a separate call to run_in_executor, or
    • start a for loop in a separate thread and use a queue to transfer the objects to an async consumer.

    Either approach can be abstracted into a function that accepts an iterator and returns an equivalent async iterator. This is an implementation of the second approach:

    import asyncio, threading
    
    def async_wrap_iter(it):
        """Wrap blocking iterator into an asynchronous one"""
        loop = asyncio.get_event_loop()
        q = asyncio.Queue(1)
        exception = None
        _END = object()
    
        async def yield_queue_items():
            while True:
                next_item = await q.get()
                if next_item is _END:
                    break
                yield next_item
            if exception is not None:
                # the iterator has raised, propagate the exception
                raise exception
    
        def iter_to_queue():
            nonlocal exception
            try:
                for item in it:
                    # This runs outside the event loop thread, so we
                    # must use thread-safe API to talk to the queue.
                    asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
            except Exception as e:
                exception = e
            finally:
                asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()
    
        threading.Thread(target=iter_to_queue).start()
        return yield_queue_items()
    

    It can be tested with a trivial sync iterator that uses time.time() to block and an async heartbeat function to prove that the event loop is running:

    # async_wrap_iter definition as above
    
    import time
    
    def test_iter():
        for i in range(5):
            yield i
            time.sleep(1)
    
    async def test():
        ait = async_wrap_iter(test_iter())
        async for i in ait:
            print(i)
    
    async def heartbeat():
        while True:
            print('alive')
            await asyncio.sleep(.1)
    
    async def main():
        asyncio.create_task(heartbeat())
        await test()
    
    asyncio.run(main())