Search code examples
pythonasynchronousasync-awaitpython-asyncio

asyncio.sleep is behaving like a blocker


I'm having a big problem trying to run this the way it's supposed to. I have a store function, which is supposed to simulate writing to a database. Currently I'm writing to a text file instead. To do that, I have a boolean which is used to introduce a delay.

import asyncio
import os
import time

async def store(text, delay: bool = False):
    mode = "a" if os.path.exists(DB_FILE) else "w"
    if delay:
        await asyncio.sleep(5)
    with open(DB_FILE, mode=mode) as f:
        timestamp = str(time.time())
        row = f"{text}\t{timestamp}\n"
        f.write(row)
        f.flush()
        print("done saving")

I then have a function that simulates writing several changes to that database, and then gets the latest. If there is a delay in the last store function, then it wont count as "latest".

texts = [("how", False), ("are", False), ("you", True)]

async def run_tests():
    initial_text = "hi"
    await store(initial_text)
    curr_text = initial_text
    last_finished = initial_text
    for text, delay in texts:
        await store(text, delay=delay)
        if not delay:  # if finished
            last_finished = curr_text
    assert last_finished == "are"


if __name__ == "__main__":
    asyncio.run(run_tests())

I tried running this with both asyncio.run() and loop.run_until_complete(), but neither is working. asyncio.sleep still behaves like time.sleep.

Ideally I want the line

await store(text, delay=delay)

to execute asynchronously while the rest of the function run_tests keeps going.

What am i doing wrong?


Solution

  • It is not asyncio.sleep that is blocking you here. It is that you await each coroutine store in your for-loop. That means literally you wait for it to execute completely before continuing with your iteration.

    If you want to schedule each coroutine immediately, but not wait for it to finish, you can use asyncio.create_task on them. Just make sure to await all the tasks at some point.

    Here is a simplified example:

    from asyncio import create_task, gather, run, sleep
    
    
    async def store(text: str, delay: bool = False) -> None:
        print(text)
        if delay:
            await sleep(5)
    
    
    texts = [
        ("hi", False),
        ("how", False),
        ("are", False),
        ("you", True),
    ]
    
    
    async def run_tests() -> None:
        last_finished: str | None = None
        tasks = []
        for text, delay in texts:
            tasks.append(create_task(store(text, delay=delay)))
            if not delay:
                last_finished = text
        await gather(*tasks)
        assert last_finished == "are"
    
    
    if __name__ == "__main__":
        run(run_tests())
    

    I used asyncio.gather here for convenience to await all the tasks at the end.

    If you are on Python >=3.11, you can use a asyncio.TaskGroup context manager instead:

    from asyncio import TaskGroup, run, sleep
    
    ...
    
    async def run_tests() -> None:
        last_finished: str | None = None
        async with TaskGroup() as task_group:
            for text, delay in texts:
                task_group.create_task(store(text, delay=delay))
                if not delay:
                    last_finished = text
        assert last_finished == "are"
    

    But I have to say the variable naming you use is a bit confusing because last_finished suggests that the coroutine that was scheduled last (with the "are" text) is also the one that is done last. That is not true in general!

    Once a coroutine is scheduled, there are no inherent guarantees that it will finish before or after some other scheduled coroutine, no matter which was scheduled first. To ensure that, you would have to make use of some sort of synchronization primitive.

    Depending on the actual context, this may be especially important because you were talking about writing things to a database. But in this super simple example the order is likely to hold true.