Search code examples
pythonpython-asynciogeneratoryieldasync-iterator

Asyncio yielding results from multiple futures as they arrive


I currently have a function, which yields results from an external source, which may arrive over a long time period.

async def run_command(data):
    async with httpx.AsyncClient() as client:
        url = f"http://hostname.com:8000/"
        async with client.stream("POST", url, json=data, timeout=None) as r:
            async for line in r.aiter_lines():
                yield (json.loads(line), 200)

This currently works fine for one data call, and I can process the result simply like so:

async for result in run_command(data):
    yield result

The result is passed back to the initial calling function which streams it to a GUI using Django channels.

async for result, status in self.call_command(dataSet):
    await self.channel_layer.group_send(
        self.group_name, {
            'type': "transmit",
            'message': result,
            'status': status,
        },
    )

However, I would now like to call the run_command function concurrently for multiple data sets. I'm trying to do it like so:

for future in asyncio.as_completed([run_command(data) for data in dataSet]):
    yield future

I get a TypeError: An asyncio.Future, a coroutine or an awaitable is required. Replacing the last line with the two below, still gives the same error.

    result = await future
    yield result

Changing the for loop into async for loop doesn't work either with the error: TypeError: 'async for' requires an object with __aiter__ method, got generator.

So is it possible to yield results as they arrive from multiple futures?


Solution

  • If I understand your requirements correctly, this sounds like a textbook case for a queue producer-consumer setup.

    Let me try and abstract this for you. You have some asynchronous generator that you create by supplying some input data. Once it's running, it yields results at irregular intervals.

    Now you have multiple sets of data. You want to have multiple generators concurrently crunching away at those and you want to be able to process a result as soon as one is yielded by any of the generators.

    The solution I propose is to write two coroutine functions - a queue_producer and a queue_consumer. Both receive the same asyncio.Queue instance as argument.

    The producer also receives one single piece of input data. It sets up the aforementioned asynchronous generator and begins iterating through it. As soon as a new item is yielded to it, it puts it into the queue.

    The consumer is actually itself an asynchronous generator. It receives a timeout argument in addition to the queue. It starts an infinite loop of awaiting the next item it can get from the queue. Once it gets an item, it yields that item. If the waiting takes longer than the specified timeout, it breaks out of the loop and ends.

    To demonstrate this, I will use this very silly and simple asynchronous iterator implementation that simply iterates over characters in a string with random sleep times in between each letter. I'll call it funky_string_iter.

    Here is a full working example:

    from asyncio import TimeoutError, run
    from asyncio.tasks import create_task, gather, sleep, wait_for
    from asyncio.queues import Queue
    from collections.abc import AsyncIterator
    from random import random
    
    async def funky_string_iter(data: str) -> AsyncIterator[str]:
        for char in data:
            await sleep(random())
            yield char
    
    async def queue_producer(queue: Queue[str], data: str) -> None:
        async for result in funky_string_iter(data):
            await queue.put(result)
    
    async def queue_consumer(queue: Queue[str], timeout: float) -> AsyncIterator[str]:
        while True:
            try:
                yield await wait_for(queue.get(), timeout)
            except TimeoutError:
                break
    
    async def main() -> None:
        data_sets = ["abc", "xyz", "foo"]
        q: Queue[str] = Queue()
        # Launch the producer tasks:
        producers = [
            create_task(queue_producer(q, data))
            for data in data_sets
        ]
        # Iterate through the consumer until it times out:
        async for result in queue_consumer(q, timeout=3):
            print(result)
        await gather(*producers)  # Clean up the producer tasks
    
    if __name__ == '__main__':
        run(main())
    

    The output will obviously be more or less randomly ordered, but here is one example output I got:

    a
    f
    o
    b
    x
    y
    z
    o
    c
    

    This demonstrates how the characters are yielded by the queue_consumer as soon as they are available (in the queue), which in turn depends on which queue_producer task yields the next character from its string.


    You can transfer this to your specific case by substituting that funky string iterator for your run_command. Naturally, adjust the rest as needed.

    As for that error you got from the asyncio.as_completed iteration, it is not entirely clear. But that function does not seem to be the right fit as it is not intended for asynchronous iterators.