I'm having a big problem trying to run this the way it's supposed to.
I have a store
function, which is supposed to simulate writing to a database. Currently I'm writing to a text file instead. To do that, I have a boolean which is used to introduce a delay.
import asyncio
import os
import time
async def store(text, delay: bool = False):
mode = "a" if os.path.exists(DB_FILE) else "w"
if delay:
await asyncio.sleep(5)
with open(DB_FILE, mode=mode) as f:
timestamp = str(time.time())
row = f"{text}\t{timestamp}\n"
f.write(row)
f.flush()
print("done saving")
I then have a function that simulates writing several changes to that database, and then gets the latest. If there is a delay in the last store function, then it wont count as "latest".
texts = [("how", False), ("are", False), ("you", True)]
async def run_tests():
initial_text = "hi"
await store(initial_text)
curr_text = initial_text
last_finished = initial_text
for text, delay in texts:
await store(text, delay=delay)
if not delay: # if finished
last_finished = curr_text
assert last_finished == "are"
if __name__ == "__main__":
asyncio.run(run_tests())
I tried running this with both asyncio.run()
and loop.run_until_complete()
, but neither is working. asyncio.sleep
still behaves like time.sleep
.
Ideally I want the line
await store(text, delay=delay)
to execute asynchronously while the rest of the function run_tests
keeps going.
What am i doing wrong?
It is not asyncio.sleep
that is blocking you here. It is that you await
each coroutine store
in your for
-loop. That means literally you wait for it to execute completely before continuing with your iteration.
If you want to schedule each coroutine immediately, but not wait for it to finish, you can use asyncio.create_task
on them. Just make sure to await all the tasks at some point.
Here is a simplified example:
from asyncio import create_task, gather, run, sleep
async def store(text: str, delay: bool = False) -> None:
print(text)
if delay:
await sleep(5)
texts = [
("hi", False),
("how", False),
("are", False),
("you", True),
]
async def run_tests() -> None:
last_finished: str | None = None
tasks = []
for text, delay in texts:
tasks.append(create_task(store(text, delay=delay)))
if not delay:
last_finished = text
await gather(*tasks)
assert last_finished == "are"
if __name__ == "__main__":
run(run_tests())
I used asyncio.gather
here for convenience to await all the tasks at the end.
If you are on Python >=3.11
, you can use a asyncio.TaskGroup
context manager instead:
from asyncio import TaskGroup, run, sleep
...
async def run_tests() -> None:
last_finished: str | None = None
async with TaskGroup() as task_group:
for text, delay in texts:
task_group.create_task(store(text, delay=delay))
if not delay:
last_finished = text
assert last_finished == "are"
But I have to say the variable naming you use is a bit confusing because last_finished
suggests that the coroutine that was scheduled last (with the "are"
text) is also the one that is done last. That is not true in general!
Once a coroutine is scheduled, there are no inherent guarantees that it will finish before or after some other scheduled coroutine, no matter which was scheduled first. To ensure that, you would have to make use of some sort of synchronization primitive.
Depending on the actual context, this may be especially important because you were talking about writing things to a database. But in this super simple example the order is likely to hold true.