Search code examples
pythonunit-testingmockingpython-unittest

AsyncMock coroutines never actually yield control?


I'm trying to test some asynchronous code I wrote. The structure is:

  • A worker object is responsible for receiving a task and asynchronously executing it, often awaiting on some asynchronous IO call
  • Sitting above each worker object is a loop that reads from a shared queue, passing tasks down to the worker as each task becomes available. Note that this queue is persistent, and in reality I'm using a blocking persistent queue library.
  • At the very top is an asyncio.gather call, waiting on each of the inner loops. I've constructed a toy example:
import asyncio


async def loop_inner(worker, worker_num, tasks):
    while tasks:
        task = tasks.pop()
        print('Worker number {} handled task number {}'.format(
            worker_num, await worker.do_work(task)))


async def loop(workers, tasks):
    tasks = [loop_inner(worker, worker_num, tasks) for
                     worker_num, worker in enumerate(workers)]
    await asyncio.gather(*tasks)

When run on a real workload, the structure works great. High throughput, good use of parallelism, etc.

The problem is when I want to test it. I'd like to write tests that mimic the distribute of tasks across the various workers. However, I want to test just the distribution logic, while mocking out the worker code itself. My natural impulse is to replace real workers with AsyncMock objects. Problem is, when I run this test case, all the work is being handled by a single worker:

from unittest import IsolatedAsyncioTestCase, main
from unittest.mock import ANY, AsyncMock, patch

class TestCase(IsolatedAsyncioTestCase):
    async def test_yielding(self):
        tasks = list(range(10))
        workers = [AsyncMock() for i in range(2)]
        for worker in workers:
            worker.do_work.side_effect = lambda task: task

        await loop(workers, tasks)

main()

My output is as follows:

Worker number 0 handled task number 9
Worker number 0 handled task number 8
Worker number 0 handled task number 7
Worker number 0 handled task number 6
Worker number 0 handled task number 5
Worker number 0 handled task number 4
Worker number 0 handled task number 3
Worker number 0 handled task number 2
Worker number 0 handled task number 1
Worker number 0 handled task number 0

What gives? Why is just one worker handling all the work? Is the event loop not passing control off to the other worker? Is the AsyncMock coroutine not really yielding control? What can I do to more realistically test this?


Solution

  • Why is just one worker handling all the work?

    Scheduler will never interrupt upon quantum expiration, as we are using cooperative threads here. So cooperate.

    Computing list(range(10)) never yields control.

    Voluntarily yielding control with a brief sleep in the lambda, or structuring it as an async function, gives the scheduler an opportunity to keep more than one plate spinning simultaneously.