Search code examples
pythonpython-3.xpython-asyncio

asyncio how to chain coroutines


I have the following test code, where I am trying to chain together different coroutines. The idea is that I want to have one coroutine that downloads data, and as soon as data is downloaded I want to get the data into the second routine which then process the data. The code below works, whenever I skip the process_data step, but whenever I include the process_data step (trying to chain together coroutines) it fails. How can I fix it?

import asyncio
import time

task_inputs = [0,1,2,3,4,5,4,3,4]

async def download_dummy(url):
    await asyncio.sleep(url)
    data = url
    print(f'downloaded {url}')
    return data

async def process_data(data):
    await asyncio.sleep(1)
    processed_data = data*2
    print(f"processed {data}")
    return processed_data

async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(process_data(download_dummy(task)))
            # res = tg.create_task(download_dummy(task))
            task_handlers.append(res)

    print(f"finished at {time.strftime('%X')}")
    results = [task_handler.result() for task_handler in task_handlers]
    print(results)

asyncio.run(main(task_inputs))

The error I get is rather telling, it seems that the first coroutine is not actually executed, when it is passed to the second coroutine, but I am not sure how I can elegantly fix this.

+ Exception Group Traceback (most recent call last):
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2252, in <module>
  |     main()
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2234, in main
  |     globals = debugger.run(setup['file'], None, None, is_module)
  |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1544, in run
  |     return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1551, in _exec
  |     pydev_imports.execfile(file, globals, locals)  # execute the script
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
  |     exec(compile(contents+"\n", file, 'exec'), glob, loc)
  |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 31, in <module>
  |     asyncio.run(main(task_inputs))
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 194, in run
  |     return runner.run(main)
  |            ^^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 118, in run
  |     return self._loop.run_until_complete(task)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\base_events.py", line 687, in run_until_complete
  |     return future.result()
  |            ^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 21, in main
  |     async with asyncio.TaskGroup() as tg:
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\taskgroups.py", line 145, in __aexit__
  |     raise me from None
  | ExceptionGroup: unhandled errors in a TaskGroup (9 sub-exceptions)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 2 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 3 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 4 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 5 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 6 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 7 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 8 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 9 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +------------------------------------

Solution

  • The problem is that the line tg.create_task(process_data(download_dummy(task))) wont call process_data with the result of download_dummy, but with an awaitable - that is, the paramter that gets inside process_data have to be awaited to get to its value.

    For simple, hard-coded, pipelines, I´d usually just create a small function calling the co-routines in order:

    async def pipeline(arg):
        step1 = await download_data(arg)
        step2 = await process_results(step1)
        return step2
    
    

    And then tg.create_task(pipeline(task)))

    Turns out that pipeline can be made generic, and get the co-routines to run in series at runtime - that should work even for complicated cases:

    from typing import Sequence, Awaitable, Any
    
    async def pipeline(coroutines: Sequence[awaitable], initial_arg):
         partial = initial_arg
         for coroutine in couroutines:
              partial = await coroutine(partial)
         return partial
    [...]
    async def main(task_inputs):
        task_handlers  = []
        print(f"started at {time.strftime('%X')}")
        chain = [download_dummy, process_data]
        async with asyncio.TaskGroup() as tg:
            for task in task_inputs:
                res = tg.create_task(pipeline(chain, task)
                task_handlers.append(res)