Search code examples
pythondjangopostgresqlpytestcelery

Pytest with Django Celery: ERROR django.db.utils.InterfaceError: connection already closed


I have celery task which create models entries in database. Celery uses Redis as a broker.

I create new config for test in test_settings.py (the application works with Docker, so I'm changing host for database and Redis):

# test_settings.py

from .settings import *

DATABASES = {
    'default': {
        'ENGINE': env('POSTGRES_ENGINE'),
        'NAME': env('POSTGRES_DB'),
        'USER': env('POSTGRES_USER'),
        'PASSWORD': env('POSTGRES_PASSWORD'),
        'HOST': 'localhost',
        'PORT': env('POSTGRES_PORT')
    }
}
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': 'redis://localhost:6379/1',
    }
}

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
# tasks.py

from .models import Message
from itertools import islice
from celery import shared_task

@shared_task(ignore_result=True)
def create_entries(data: list):
    batch_size = 100
    obj_iterator = (Message(**obj) for obj in data)
    while True:
        batch = list(islice(obj_iterator, batch_size))
        if not batch:
            break
        Message.objects.bulk_create(batch, batch_size)

In application everything works fine, but tests don't work. Test method code:

# tests/test_celery_tasks.py

import tasks
import pytest
from .models import Message
from celery.result import AsyncResult

@pytest.mark.django_db
@pytest.mark.celery
def test_create_entries(celery_worker):
    message_data = [
        {
            text: 'hello bro',
            client: 'Nick'
        },
    ]
    assert Message.objects.count() == 0
    task = tasks.create_entries.delay(message_data)
    result = AsyncResult(task.task_id)
    assert result.status == 'SUCCESS'
    assert Message.objects.count() == 1

I get an error ERROR service/tests/test_celery_tasks.py::test_create_entries[Message] - django.db.utils.InterfaceError: connection already closed

if I add a transaction, then this error disappears, but there will be Assert Fail:

@pytest.mark.django_db(transaction=True)
@pytest.mark.celery
def test_create_entries(celery_worker):
    # the same code
    assert result.status == 'SUCCESS' # Fail, status=PENDING
    assert Message.objects.count() == 1 # Fail, count=0

I have no idea why tasks don't work in tests


Solution

  • Add these configs to your test_settings.py

    CELERY_TASK_ALWAYS_EAGER = True
    CELERY_TASK_EAGER_PROPAGATES = True
    

    and try to run the tests in this way always. These settings make Celery to run the tasks eagerly when you call delay method.