Search code examples
pythonasynchronouspython-asynciocoroutineevent-loop

asyncio task was destroyed but it is pending


I am working a sample program that reads from a datasource (csv or rdbms) in chunks, makes some transformation and sends it via socket to a server.

But because the csv is very large, for testing purpose I want to break the reading after few chunks. Unfortunately something goes wrong and I do not know what and how to fix it. Probably I have to do some cancellation, but now sure where and how. I get the following error:

Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>

The sample code is:

import asyncio
import json

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.001)
    yield chunk

async def send(row):
    j = json.dumps(row)
    print(f"to be sent: {j}")
    await asyncio.sleep(0.001)


async def main():
    i = 0
    async for chunk in readChunks():
        for k, v in chunk.items():
            await asyncio.gather(send({k:v}))
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Solution

  • Many async resources, such as generators, need to be cleaned up with the help of an event loop. When an async for loop stops iterating an async generator via break, the generator is cleaned up by the garbage collector only. This means the task is pending (waits for the event loop) but gets destroyed (by the garbage collector).

    The most straightforward fix is to aclose the generator explicitly:

    async def main():
        i = 0
        aiter = readChunks()      # name iterator in order to ...
        try:
            async for chunk in aiter:
                ...
                i += 1
                if i > 5:
                    break
        finally:
            await aiter.aclose()  # ... clean it up when done
    

    These patterns can be simplified using the asyncstdlib (disclaimer: I maintain this library). asyncstdlib.islice allows to take a fixed number of items before cleanly closing the generator:

    import asyncstdlib as a
    
    async def main():
        async for chunk in a.islice(readChunks(), 5):
            ...
    

    If the break condition is dynamic, scoping the iterator guarantees cleanup in any case:

    import asyncstdlib as a
    
    async def main():
        async with a.scoped_iter(readChunks()) as aiter:
            async for idx, chunk in a.enumerate(aiter):
                ...
                if idx >= 5:
                    break