Search code examples
pythondjangounit-testingcelerydjcelery

How to properly test scheduled test using djcelery


I have a periodic task using djcelery. This task is looking into the db and it makes same changes if it is necessary. As you know "python manage.py test" creates a test db, so how I can run this periodic task in this test db?

I tried to run first: python manage.py test

then when I tried to run the following

python manage.py celery worker -l info &

python manage.py celery beat -l info &

the test is terminated


Solution

  • You don't need to run the celery workers or celery beat, that would not be really good unit testing. You are trying to test the task, so you should isolate the task and test it. You need to run the tests with CELERY_ALWAYS_EAGER mode or simply run your task from inside the test without the asyncrhonous mode.

    To avoid using the async simply call you task like this:

    @celery.task
    def task_to_test():
        print('stuff')
    
    
    def test:
       task_to_test()
    

    If you cannot do that. Override your settings to use always eager.

    from tasks import task_to_test
    
    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_MEMORY='memory',
                       CACHES = {'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',}})
    class MyTests(TestCase):
    def test():
        task_to_test.delay()