I have two iterators:
async def iter1():
for i in range(5):
await asyncio.sleep(0.1)
yield 1
async def iter2():
for i in range(10):
await asyncio.sleep(1)
yield 2
I want to create a function async_zip_stream(*args, default_value=0)
.
It should iterate over all the passed iterators simultaneously without synchronization, yielding tuples of values. If any iterator doesn't generate a value at some point, it should return the default_value
.
So in this case, the output would be:
1, 0
1, 0
1, 0
1, 0
1, 0
0, 2
....
I don't quite understand how this can be done without synchronization between the iterators.
async def async_zip_stream(*iterators, default_value=0):
tasks = [asyncio.create_task(iterator.__anext__()) for iterator in iterators]
done = [False] * len(iterators)
while not all(done):
results = []
for i, task in enumerate(tasks):
if done[i]:
results.append(default_value)
elif task.done():
try:
results.append(task.result())
tasks[i] = asyncio.create_task(iterators[i].__anext__())
except StopAsyncIteration:
done[i] = True
results.append(default_value)
else:
results.append(default_value)
yield tuple(results)
await asyncio.sleep(0.01) # Slight delay to allow other tasks to progress