Search code examples
pythonpython-asyncio

Asyncio - repeatedly invoke multiple functions and merge results


I have two functions:

async def f(i):
    await asyncio.sleep(1)
    return f'f{i}'


async def g(i):
    await asyncio.sleep(2)
    return f'g{i}'

I want to write a loop that calls them repeatedly and prints the results as they come. Like this imaginary code:

for c in amerge(amap(f, itertools.count()), 
                amap(g, itertools.count())):
    x = await c
    print(x)

And the result should be approx f0, f1, g1, f2, f3, g2, f4, f5, g3, ...

My attempt was this:

async def run():
    """bad, always wait for f, then for g"""
    for i in itertools.count():
        for c in asyncio.as_completed([f(i), g(i)]):
            res = await c
            print(res)

asyncio.run(run())

But this is not correct, it prints f0, g0, f1, g1, ...

Any ideas?


Solution

  • The aiostream library provides a aiostream.stream.merge, which can be used to combine multiple async generators. If we rewrite your code like this:

    import asyncio
    import itertools
    
    from aiostream import stream
    
    
    async def f(i):
        await asyncio.sleep(1)
        return f"f{i}"
    
    
    async def g(i):
        await asyncio.sleep(2)
        return f"g{i}"
    
    
    async def amap(func, iterable):
        for i in iterable:
            res = await func(i)
            yield res
    
    
    async def run():
        async with stream.merge(
            amap(f, itertools.count()), amap(g, itertools.count())
        ).stream() as streamer:
            async for x in streamer:
                print(x)
    
    
    asyncio.run(run())
    

    We get as output:

    f0
    f1
    g0
    f2
    f3
    g1
    f4
    f5
    g2
    f6
    f7
    g3
    .
    .
    .
    

    You'll note that the code here looks pretty much exactly like your pseudocode, except that amerge is provided by aiostream.stream.merge.