I'm trying to integration test a concurrent celery task in my Django app. I want the task to actually run concurrently on a worker during the pytest integration test but I'm having trouble making that work.
Let's say I have the following basic celery task:
@shared_task
def sleep_task(secs):
print(f"Sleeping for {secs} seconds...")
for i in range(secs):
time.sleep(i)
print(f"\t{i + 1}")
return "DONE"
Running the task concurrently in a script works fine:
# script
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3
However, I can't duplicate this asynchronous behavior in a unit test. Within conftest.py
I setup the broker and the result backend:
import pytest
pytest_plugins = ("celery.contrib.pytest",)
@pytest.fixture(scope="session")
def celery_config():
return {"broker_url": "memory://", "result_backend": "rpc://"}
And here is my unit test. celery_session_app
and celery_session_worker
sets up the celery app and worker used for testing (docs):
def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3
It appears the tasks are running synchronously. Is there anyway to run the tasks asynchronously?
celery_session_worker fixture is starting a celery worker with concurrency of 1. This is hardcoded in celery.contrib and it is not configurable currently. Since concurrency is 1, it will process only 1 task at a time.
An alternate solution is to write your own fixture to start a worker with a concurrency of 2 or more.