A 3rd-party API library provides an iterator for listing items and features built-in pagination. It's blocking and I would like to do multiple listing in parallel.
async def list_multiple(params_list):
async_tasks = []
for params in params_list:
async_tasks.append(list_one(**params))
await asyncio.gather(*async_tasks)
async def list_one(**kwargs):
blocking_iterator = some_library.get_api_list_iterator(**kwargs)
async for item in iterate_blocking(blocking_iterator):
pass # do things
async def iterate_blocking(iterator):
loop = asyncio.get_running_loop()
while True:
try:
yield await loop.run_in_executor(None, iterator.next)
except StopIteration:
break
But doing this raises
TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
and blocks all threads. How do I iterate a blocking iterator correctly?
Note that the method used for iteration is called __next__
in Python 3, not next
. next
works probably because of some Python 2 compatibility code being set up by the library.
You can fix the issue by catching StopIteration
while still in the auxilliary thread, and using a different exception, or another kind of signal, to indicate end of iteration. For example, this code uses a sentinel object:
async def iterate_blocking(iterator):
loop = asyncio.get_running_loop()
DONE = object()
def get_next():
try:
return iterator.__next__()
except StopIteration:
return DONE
while True:
obj = await loop.run_in_executor(None, get_next)
if obj is DONE:
break
yield obj
This can be further simplified using the two-argument form of the next
built-in, which does essentially the same thing as get_next
:
async def iterate_blocking(iterator):
loop = asyncio.get_running_loop()
DONE = object()
while True:
obj = await loop.run_in_executor(None, next, iterator, DONE)
if obj is DONE:
break
yield obj
(Both above examples are untested, so typos are possible.)