Search code examples

Test calling a python coroutine (async def) from a regular function

Let's say I have some asyncio coroutine which fetches some data and returns it. Like this:

async def fetch_data(*args):
  result = await some_io()
  return result

Basically this coroutine is called from the chain of coroutines, and initial coroutine is runned by creating a task. But what if for test purposes I want to run only one coroutine this way just when running some file:

if __name__ == '__main__':
  result = await fetch_data(*args)

And obviously I can't do this since I'm trying to run and await coroutine from not coroutine function. So the question is: is there some correct way to get data from coroutine without calling it function? I can make some Future object for result and await it, but maybe there are some other more simple and clearer ways?


  • You will need to create an event loop to run your coroutine:

    import asyncio
    async def async_func():
        return "hello"
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(async_func())

    Or as a function:

    def run_coroutine(f, *args, **kwargs):
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(f(*args, **kwargs))
        return result

    Use like this:



    assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")