Search code examples
pythoniteratorpython-asyncio

How to iterate a blocking iterator with asyncio?


A 3rd-party API library provides an iterator for listing items and features built-in pagination. It's blocking and I would like to do multiple listing in parallel.

async def list_multiple(params_list):
    async_tasks = []
    for params in params_list:
        async_tasks.append(list_one(**params))
    await asyncio.gather(*async_tasks)


async def list_one(**kwargs):
    blocking_iterator = some_library.get_api_list_iterator(**kwargs)
    async for item in iterate_blocking(blocking_iterator):
        pass  # do things


async def iterate_blocking(iterator):
    loop = asyncio.get_running_loop()
    while True:
        try:
            yield await loop.run_in_executor(None, iterator.next)
        except StopIteration:
            break

But doing this raises

TypeError: StopIteration interacts badly with generators and cannot be raised into a Future

and blocks all threads. How do I iterate a blocking iterator correctly?


Solution

  • Note that the method used for iteration is called __next__ in Python 3, not next. next works probably because of some Python 2 compatibility code being set up by the library.

    You can fix the issue by catching StopIteration while still in the auxilliary thread, and using a different exception, or another kind of signal, to indicate end of iteration. For example, this code uses a sentinel object:

    async def iterate_blocking(iterator):
        loop = asyncio.get_running_loop()
        DONE = object()
        def get_next():
            try:
                return iterator.__next__()
            except StopIteration:
                return DONE
    
        while True:
            obj = await loop.run_in_executor(None, get_next)
            if obj is DONE:
                break
            yield obj
    

    This can be further simplified using the two-argument form of the next built-in, which does essentially the same thing as get_next:

    async def iterate_blocking(iterator):
        loop = asyncio.get_running_loop()
        DONE = object()
        while True:
            obj = await loop.run_in_executor(None, next, iterator, DONE)
            if obj is DONE:
                break
            yield obj
    

    (Both above examples are untested, so typos are possible.)