Search code examples
pythonpython-3.xpython-asyncio

How to get the first result from async tasks which is not None


I have a situation where I need to stop the tasks in the async function where it matches the condition whether in task_1 or task_2 or task_3 and also check that result is not None if the results is None then I'd like to proceed till the function gets the first result which is not None. Here is the minimal reproducible code I have so far:

import asyncio


async def task_1(_id):
    _ids = ["230327-12717", "230221-28276", "230214-06090"]

    for i in _ids:
        if i == _id:
            return f"Found {_id} in task_1"


async def task_2(_id):
    _ids = ["230502-14191", "230425-17005", "230327-14434"]

    for i in _ids:
        if i == _id:
            return f"Found {_id} in task_2"


async def task_3(_id):
    _ids = ["230404-23786", "230221-25729", "230221-28276"]

    for i in _ids:
        if i == _id:
            return f"Found {_id} in task_3"


async def main(_id):
    tasks = [task_1(_id), task_2(_id), task_3(_id)]

    finished, unfinished = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )

    print(finished)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main("230502-14191"))

which gives output:

{<Task finished name='Task-2' coro=<task_2() done, defined at /home/zerox/pipedrive_CRM/minimal.py:13> result='Found 230502-14191 in task_2'>, <Task finished name='Task-3' coro=<task_3() done, defined at /home/zerox/pipedrive_CRM/minimal.py:21> result=None>, <Task finished name='Task-4' coro=<task_1() done, defined at /home/zerox/pipedrive_CRM/minimal.py:5> result=None>}

And my expected output should be something like this:

{<Task finished name='Task-2' coro=<task_2() done, defined at /home/zerox/pipedrive_CRM/minimal.py:13> result='Found 230502-14191 in task_2'>}

How can I achieve that?


Solution

  • This seems like an XY Problem, so I'll try and interpret what you actually want to accomplish.

    Whenever you are dealing with multiple tasks that need to be coordinated in some way, it is a good idea to start with the available synchronization primitives.

    In this case, you want the search for your specific ID to stop in all tasks, as soon as possible, after it has been found in one of them.

    An Event object can be very useful for such a situation. The main coroutine schedules the "searching" tasks and then simply waits for the event to be set. The tasks all carry a reference to that event object and once the ID is found by one of the tasks, it sets the event. The main coroutine then simple cancels all the tasks and cleans up.

    Demo:

    from asyncio import Event, create_task, gather, run, sleep
    from collections.abc import Iterable
    
    
    async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> None:
        for i in ids:
            await sleep(0)
            print(f"{name} checking {i}")
            if i == id_:
                print(f"Found {id_} in {name}")
                found.set()
    
    
    async def task_1(id_: str, found: Event) -> None:
        ids = ["230327-12717", "230221-28276", "230214-06090"]
        await _search(id_, ids, found, "task_1")
    
    
    async def task_2(id_: str, found: Event) -> None:
        ids = ["230502-14191", "230425-17005", "230327-14434"]
        await _search(id_, ids, found, "task_2")
    
    
    async def task_3(id_: str, found: Event) -> None:
        ids = ["230404-23786", "230221-25729", "230221-28276"]
        await _search(id_, ids, found, "task_3")
    
    
    async def main(id_: str) -> None:
        found = Event()
        tasks = [
            create_task(task_1(id_, found)),
            create_task(task_2(id_, found)),
            create_task(task_3(id_, found)),
        ]
        await found.wait()
        for task in tasks:
            task.cancel()
        await gather(*tasks, return_exceptions=True)
    
    
    if __name__ == "__main__":
        run(main("230502-14191"))
    

    Output:

    task_1 checking 230327-12717
    task_2 checking 230502-14191
    Found 230502-14191 in task_2
    task_3 checking 230404-23786
    task_1 checking 230221-28276
    

    A few things to note:

    • Some await within the for-loop is necessary for this simple example to ensure that a context switch can happen. Otherwise the tasks will simply execute sequentially. That is why I added the await asyncio.sleep(0).
    • As you can see, even though the ID was found by task 2, both task 1 and task 3 each still had one more go. There is no way to prevent that because the event loop decides itself, which coroutine to switch to at any given moment. And await found.wait() in our main coroutine only guarantees that it will block until the event is set, not that it receives control right away. But you'll notice that none of the tasks actually finished, i.e. none of them went though all their IDs because they were cancelled before they could.
    • I am using asyncio.gather with return_exceptions=True because I do not want any of the CancelledErrors to propagate up to the main coroutine.
    • Technically you don't need the gather at all because the coroutines will be cancelled regardless, but it is bad form to leave dangling un-await-ed tasks lying around. You should always await your tasks at some point. And gather seems like the easiest solution here.

    PS: Returning results from asyncio.gather

    If you want the task_ functions to return something, gather will collect those results for you too:

    from asyncio import Event, create_task, gather, run, sleep
    from collections.abc import Iterable
    
    
    async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> str:
        for i in ids:
            await sleep(0)
            print(f"{name} checking {i}")
            if i == id_:
                found.set()
                return f"Found {id_} in {name}"
    
    
    async def task_1(id_: str, found: Event) -> str:
        ids = ["230327-12717", "230221-28276", "230214-06090"]
        return await _search(id_, ids, found, "task_1")
    
    
    async def task_2(id_: str, found: Event) -> str:
        ids = ["230502-14191", "230425-17005", "230327-14434"]
        return await _search(id_, ids, found, "task_2")
    
    
    async def task_3(id_: str, found: Event) -> str:
        ids = ["230404-23786", "230221-25729", "230221-28276"]
        return await _search(id_, ids, found, "task_3")
    
    
    async def main(id_: str) -> None:
        found = Event()
        tasks = [
            create_task(task_1(id_, found)),
            create_task(task_2(id_, found)),
            create_task(task_3(id_, found)),
        ]
        await found.wait()
        for task in tasks:
            task.cancel()
        results = await gather(*tasks, return_exceptions=True)
        for result in results:
            if not isinstance(result, Exception):
                print(result)
    
    
    if __name__ == "__main__":
        run(main("230502-14191"))
    

    PPS: Use a Queue instead of an Event

    Alternatively you can set up a Queue to put the result in and simply await that result in the main coroutine. That may be a more elegant solution alltogether, but I suppose that is a matter of preference:

    from asyncio import Queue, create_task, gather, run, sleep
    from collections.abc import Iterable
    
    
    async def _search(id_: str, ids: Iterable[str], q: Queue, name: str) -> None:
        for i in ids:
            await sleep(0)
            print(f"{name} checking {i}")
            if i == id_:
                q.put_nowait(f"Found {id_} in {name}")
    
    
    async def task_1(id_: str, q: Queue) -> None:
        ids = ["230327-12717", "230221-28276", "230214-06090"]
        await _search(id_, ids, q, "task_1")
    
    
    async def task_2(id_: str, q: Queue) -> None:
        ids = ["230502-14191", "230425-17005", "230327-14434"]
        await _search(id_, ids, q, "task_2")
    
    
    async def task_3(id_: str, q: Queue) -> None:
        ids = ["230404-23786", "230221-25729", "230221-28276"]
        await _search(id_, ids, q, "task_3")
    
    
    async def main(id_: str) -> None:
        result_queue = Queue()
        tasks = [
            create_task(task_1(id_, result_queue)),
            create_task(task_2(id_, result_queue)),
            create_task(task_3(id_, result_queue)),
        ]
        result = await result_queue.get()
        for task in tasks:
            task.cancel()
        await gather(*tasks, return_exceptions=True)
        print(result)
    
    
    if __name__ == "__main__":
        run(main("230502-14191"))
    

    Awaiting Queue.get will block until there is an item in the queue, then return that item. So this accomplishes essentially the same as the Event from before, but it also gives you the result right away.

    You should still clean up (i.e. cancel and await) the other tasks though.