Search code examples
pythonasynchronous

Async iterators zip in python


I have two iterators:

async def iter1():
   for i in range(5):
       await asyncio.sleep(0.1)
       yield 1

async def iter2():
       for i in range(10):
       await asyncio.sleep(1)
       yield 2

I want to create a function async_zip_stream(*args, default_value=0).

It should iterate over all the passed iterators simultaneously without synchronization, yielding tuples of values. If any iterator doesn't generate a value at some point, it should return the default_value.

So in this case, the output would be:

1, 0
1, 0
1, 0 
1, 0
1, 0 
0, 2
....

I don't quite understand how this can be done without synchronization between the iterators.


Solution

  • async def async_zip_stream(*iterators, default_value=0):
        tasks = [asyncio.create_task(iterator.__anext__()) for iterator in iterators]
        done = [False] * len(iterators)
        
        while not all(done):
            results = []
            
            for i, task in enumerate(tasks):
                if done[i]:
                    results.append(default_value)
                elif task.done():
                    try:
                        results.append(task.result())
                        tasks[i] = asyncio.create_task(iterators[i].__anext__())
                    except StopAsyncIteration:
                        done[i] = True
                        results.append(default_value)
                else:
                    results.append(default_value)
            
            yield tuple(results)
            await asyncio.sleep(0.01)  # Slight delay to allow other tasks to progress