Search code examples
pythoncelerydjango-celery

Call Celery class-based task in sync mode


I have celery class-based task

from celery import Task
from django.db import transaction

from config import celery_app


class RefreshData(Task):
    name = "refresh-data"

    @transaction.atomic
    def run(self, *args, **kwargs):
        SomeClass().some_fuc()

celery_app.register_task(RefreshData)

Now I want to run it in sync mode (in tests). How to do that?


Solution

  • The trick is to mock.patch the call of the task in such way, as we call it in sync.

    To call task in sync RefreshData().run(*args, **kwargs)