Search code examples
pythonpytestpython-asynciopytest-asyncio

Task was destroyed but it is pending! even after awaiting it


I wrote the following factory fixture that creates a ReconnectingConsumer and then awaits it to stop:

@pytest.fixture
def make_consumer():
    async def _make_consumer():
        consumer = ReconnectingConsumer(
            amqp_url=AMQP_URL, queue=INPUT_QUEUE, on_message=None
        )
        yield consumer
        await consumer.stop()
        print('Consumer stopped')

    return _make_consumer

So I am using it to create a new consumer, running it and then cancelling it to stop:

@pytest.mark.asyncio
async def test_pending_todo_set(make_consumer, make_publisher, make_dispatcher_service):
    make_consumer_gen = make_consumer()
    consumer_1: ReconnectingConsumer = await anext(make_consumer_gen)

    tasks: List[asyncio.Task] = []
    tasks.append(asyncio.create_task(consumer_1.run()))

    for t in tasks:
        t.cancel()

    _ = await asyncio.gather(*tasks, return_exceptions=True)

However, no matter all the tasks are awaited, I still get the error:

2024-05-22 12:30:13,673 | ERROR | base_events:1819 | Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<<async_generator_athrow without __name__>()>>

Solution

  • The reason it happens is that you didn't execute aiter to get async iterator from function. So it could be like this:

    @pytest.mark.asyncio
    async def test_pending_todo_set(make_consumer, make_publisher, make_dispatcher_service):
        make_consumer_gen = make_consumer()
        async_iterator = aiter(make_consumer_gen())
        consumer_1: ReconnectingConsumer = await anext(async_iterator)
    

    But the problem here is that you need to call anext again to go back to yield, and there will be StopIteration exception.

    Another point is to use async for which wiil handle aiter and anext as well as StopIteration, but also it possible to use context manager like this and it should work:

    from contextlib import asynccontextmanager   
    
    @pytest.fixture
    def make_consumer():
        @asynccontextmanager
        async def _make_consumer():
            consumer = ReconnectingConsumer(
                amqp_url=AMQP_URL, queue=INPUT_QUEUE, on_message=None
            )
            yield consumer
            await consumer.stop()
            print('Consumer stopped')
    
        return _make_consumer
    
    @pytest.mark.asyncio
    async def test_pending_todo_set(make_consumer, make_publisher, make_dispatcher_service):
        make_consumer_gen = make_consumer()
        async with make_consumer_gen() as consumer_1:
    
            tasks: List[asyncio.Task] = []
            tasks.append(asyncio.create_task(consumer_1.run()))
    
            for t in tasks:
                t.cancel()
    
            _ = await asyncio.gather(*tasks, return_exceptions=True)
    

    One more option is to use @pytest_asyncio.fixture from pytest_asyncio and yield desiarable value from there in dummy code it could be:

    @pytest_asyncio.fixture(scope='session')
    async def get_my_value() -> AsyncIterator[SomeInstance]:
        async with SomeInstance() as value:
            # code before
            yield value
            # code after