Search code examples
pythonpytestpython-asynciopytest-asyncio

Execution Hang or Deadlock During `pytest` Using `asyncio.Queue` and `TaskGroup`


I am new to async programming, I am trying to understand how TaskGroup can be used with asyncio.Queue. I have the below module with test, but when executing pytest, it prints out the queue item, but then it just hangs / deadlocks? Any suggestion on what I am doing wrong?

Module: AsynchronousQueueBeta.py

from asyncio import Queue, TaskGroup

class AsynchronousQueueBeta:
    """Asynchronous Queue Beta"""

    async def fetch_recursive(self, source_list: list[str], maximum_connection: int = 10):
        """Fetch Recursive"""
        print('Fetch Recursive')
        query_queue = Queue()

        for source in source_list:
            query_queue.put_nowait(source)

        async with TaskGroup() as group:
            task_list = [
                group.create_task(self.fetch_query(query_queue)) for _ in range(maximum_connection)
            ]

        await query_queue.join()
        result_list = [task.result() for task in task_list]
        print(f'Result List: {result_list}')


    async def fetch_query(self, queue: Queue):
        """Fetch Query"""
        while True:
            query = await queue.get()
            print(f'Query: {query}')
            queue.task_done()

Test: TestAsynchronousQueueBeta.py

import pytest
from AsynchronousQueueBeta import AsynchronousQueueBeta

class TestAsynchronousQueueBeta():
    """Test Asynchronous Queue Beta"""

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        'source_list', [
            [
                'https://httpbin.org/anything/1',
                'https://httpbin.org/anything/2',
                'https://httpbin.org/anything/3',
                'https://httpbin.org/anything/4',
                'https://httpbin.org/anything/5',
                'https://httpbin.org/anything/6',
                'https://httpbin.org/anything/7',
                'https://httpbin.org/anything/8',
                'https://httpbin.org/anything/9',
                'https://httpbin.org/anything/10',
                'https://httpbin.org/anything/11',
                'https://httpbin.org/anything/12',
            ],
        ]
    )
    async def test_fetch_recursive(self, source_list: list[str]):
        """Test Fetch Recursive"""
        beta = AsynchronousQueueBeta()

        await beta.fetch_recursive(
            source_list=source_list,
        )

Result

platform darwin -- Python 3.12.1, pytest-7.4.4, pluggy-1.3.0 -- /Users/abc/Desktop/Project/Workspace/Python/pv312/bin/python3.12
cachedir: .pytest_cache
rootdir: /Users/abc/Desktop/Project/Async
configfile: pytest.ini
plugins: asyncio-0.23.3, anyio-4.2.0
asyncio: mode=Mode.STRICT
collected 1 item

Test/TestAsynchronousQueueBeta.py::TestAsynchronousQueueBeta::test_fetch_recursive[source_list0] Fetch Recursive
Query: https://httpbin.org/anything/1
Query: https://httpbin.org/anything/2
Query: https://httpbin.org/anything/3
Query: https://httpbin.org/anything/4
Query: https://httpbin.org/anything/5
Query: https://httpbin.org/anything/6
Query: https://httpbin.org/anything/7
Query: https://httpbin.org/anything/8
Query: https://httpbin.org/anything/9
Query: https://httpbin.org/anything/10
Query: https://httpbin.org/anything/11
Query: https://httpbin.org/anything/12
^C

!!! KeyboardInterrupt !!!
/opt/python/3.12.1/lib/python3.12/selectors.py:566: KeyboardInterrupt
(to show a full traceback on KeyboardInterrupt use --full-trace)
...

Solution

  • I think I figured it out (in a sense), I put a signal, or sentinel value to tell when queue is done, here's an example, I also have a full example with a test

    async def fetch_recursive(
        self,
        source_list: list[str],
        maximum_task: int = 10,
    ):
        print('Fetch Recursive')
        query_queue = Queue()
        result_queue = Queue()
    
        async with TaskGroup() as group:
            task_list = [
                group.create_task(
                    self.fetch_query(
                        name=f'Worker-{index + 1}',
                        query_queue=query_queue,
                        result_queue=result_queue,
                    )
                ) for index in range(maximum_task)
            ]
    
            for source in source_list:
                await query_queue.put(source)
    
            for _ in range(maximum_task):
                await query_queue.put(None)
    
        result_list = []
        while not result_queue.empty():
            result = await result_queue.get()
            result_list.append(result)
    
        # Display the result(s)
        for result in result_list:
            print(f'Result: {result}')
    
    async def fetch_query(
        self,
        name: str,
        query_queue: Queue,
        result_queue: Queue,
    ):
        while True:
            query = await query_queue.get()
    
            if query is None:
                break
    
            print(f'{name} Fetch Query: {query}')
    
            await asyncio.sleep(1)
    
            result = f'{name} Result: {query}'
            await result_queue.put(result)