I have celery task which create models entries in database. Celery uses Redis as a broker.
I create new config for test in test_settings.py
(the application works with Docker, so I'm changing host for database and Redis):
# test_settings.py
from .settings import *
DATABASES = {
'default': {
'ENGINE': env('POSTGRES_ENGINE'),
'NAME': env('POSTGRES_DB'),
'USER': env('POSTGRES_USER'),
'PASSWORD': env('POSTGRES_PASSWORD'),
'HOST': 'localhost',
'PORT': env('POSTGRES_PORT')
}
}
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://localhost:6379/1',
}
}
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
# tasks.py
from .models import Message
from itertools import islice
from celery import shared_task
@shared_task(ignore_result=True)
def create_entries(data: list):
batch_size = 100
obj_iterator = (Message(**obj) for obj in data)
while True:
batch = list(islice(obj_iterator, batch_size))
if not batch:
break
Message.objects.bulk_create(batch, batch_size)
In application everything works fine, but tests don't work. Test method code:
# tests/test_celery_tasks.py
import tasks
import pytest
from .models import Message
from celery.result import AsyncResult
@pytest.mark.django_db
@pytest.mark.celery
def test_create_entries(celery_worker):
message_data = [
{
text: 'hello bro',
client: 'Nick'
},
]
assert Message.objects.count() == 0
task = tasks.create_entries.delay(message_data)
result = AsyncResult(task.task_id)
assert result.status == 'SUCCESS'
assert Message.objects.count() == 1
I get an error ERROR service/tests/test_celery_tasks.py::test_create_entries[Message] - django.db.utils.InterfaceError: connection already closed
if I add a transaction, then this error disappears, but there will be Assert Fail
:
@pytest.mark.django_db(transaction=True)
@pytest.mark.celery
def test_create_entries(celery_worker):
# the same code
assert result.status == 'SUCCESS' # Fail, status=PENDING
assert Message.objects.count() == 1 # Fail, count=0
I have no idea why tasks don't work in tests
Add these configs to your test_settings.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True
and try to run the tests in this way always. These settings make Celery to run the tasks eagerly when you call delay
method.