Search code examples
pythonasynchronousconcurrencycelerycelery-task

Run celery tasks concurrently using pytest


I'm trying to integration test a concurrent celery task in my Django app. I want the task to actually run concurrently on a worker during the pytest integration test but I'm having trouble making that work.

Let's say I have the following basic celery task:

@shared_task
def sleep_task(secs):
    print(f"Sleeping for {secs} seconds...")
    for i in range(secs):
        time.sleep(i)
        print(f"\t{i + 1}")
    return "DONE"

Running the task concurrently in a script works fine:

# script
sleep_task.delay(3)
sleep_task.delay(3)

# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3

However, I can't duplicate this asynchronous behavior in a unit test. Within conftest.py I setup the broker and the result backend:

import pytest

pytest_plugins = ("celery.contrib.pytest",)

@pytest.fixture(scope="session")
def celery_config():
    return {"broker_url": "memory://", "result_backend": "rpc://"}

And here is my unit test. celery_session_app and celery_session_worker sets up the celery app and worker used for testing (docs):

def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
    sleep_task.delay(3)
    sleep_task.delay(3)

# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3

It appears the tasks are running synchronously. Is there anyway to run the tasks asynchronously?


Solution

  • celery_session_worker fixture is starting a celery worker with concurrency of 1. This is hardcoded in celery.contrib and it is not configurable currently. Since concurrency is 1, it will process only 1 task at a time.

    An alternate solution is to write your own fixture to start a worker with a concurrency of 2 or more.