I'm trying to test some asynchronous code I wrote. The structure is:
await
ing on some asynchronous IO callasyncio.gather
call, waiting on each of the inner loops.
I've constructed a toy example:import asyncio
async def loop_inner(worker, worker_num, tasks):
while tasks:
task = tasks.pop()
print('Worker number {} handled task number {}'.format(
worker_num, await worker.do_work(task)))
async def loop(workers, tasks):
tasks = [loop_inner(worker, worker_num, tasks) for
worker_num, worker in enumerate(workers)]
await asyncio.gather(*tasks)
When run on a real workload, the structure works great. High throughput, good use of parallelism, etc.
The problem is when I want to test it. I'd like to write tests that mimic the distribute of tasks across the various workers. However, I want to test just the distribution logic, while mocking out the worker code itself. My natural impulse is to replace real workers with AsyncMock
objects. Problem is, when I run this test case, all the work is being handled by a single worker:
from unittest import IsolatedAsyncioTestCase, main
from unittest.mock import ANY, AsyncMock, patch
class TestCase(IsolatedAsyncioTestCase):
async def test_yielding(self):
tasks = list(range(10))
workers = [AsyncMock() for i in range(2)]
for worker in workers:
worker.do_work.side_effect = lambda task: task
await loop(workers, tasks)
main()
My output is as follows:
Worker number 0 handled task number 9
Worker number 0 handled task number 8
Worker number 0 handled task number 7
Worker number 0 handled task number 6
Worker number 0 handled task number 5
Worker number 0 handled task number 4
Worker number 0 handled task number 3
Worker number 0 handled task number 2
Worker number 0 handled task number 1
Worker number 0 handled task number 0
What gives? Why is just one worker handling all the work? Is the event loop not passing control off to the other worker? Is the AsyncMock
coroutine not really yielding control? What can I do to more realistically test this?
Why is just one worker handling all the work?
Scheduler will never interrupt upon quantum expiration, as we are using cooperative threads here. So cooperate.
Computing list(range(10))
never yields control.
Voluntarily yielding control with a brief sleep
in the lambda
,
or structuring it as an async function,
gives the scheduler an opportunity to keep more than one plate
spinning simultaneously.