I am working a sample program that reads from a datasource (csv or rdbms) in chunks, makes some transformation and sends it via socket to a server.
But because the csv is very large, for testing purpose I want to break the reading after few chunks. Unfortunately something goes wrong and I do not know what and how to fix it. Probably I have to do some cancellation, but now sure where and how. I get the following error:
Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>
The sample code is:
import asyncio
import json
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
for chunk in df:
await asyncio.sleep(0.001)
yield chunk
async def send(row):
j = json.dumps(row)
print(f"to be sent: {j}")
await asyncio.sleep(0.001)
async def main():
i = 0
async for chunk in readChunks():
for k, v in chunk.items():
await asyncio.gather(send({k:v}))
i += 1
if i > 5:
break
#print(f"item in main via async generator is {chunk}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Many async
resources, such as generators, need to be cleaned up with the help of an event loop. When an async for
loop stops iterating an async generator via break
, the generator is cleaned up by the garbage collector only. This means the task is pending (waits for the event loop) but gets destroyed (by the garbage collector).
The most straightforward fix is to aclose
the generator explicitly:
async def main():
i = 0
aiter = readChunks() # name iterator in order to ...
try:
async for chunk in aiter:
...
i += 1
if i > 5:
break
finally:
await aiter.aclose() # ... clean it up when done
These patterns can be simplified using the asyncstdlib
(disclaimer: I maintain this library). asyncstdlib.islice
allows to take a fixed number of items before cleanly closing the generator:
import asyncstdlib as a
async def main():
async for chunk in a.islice(readChunks(), 5):
...
If the break
condition is dynamic, scoping the iterator guarantees cleanup in any case:
import asyncstdlib as a
async def main():
async with a.scoped_iter(readChunks()) as aiter:
async for idx, chunk in a.enumerate(aiter):
...
if idx >= 5:
break