Search code examples
pythondjangocelerycelerybeatclass-based-tasks

How to pass a class based task into CELERY_BEAT_SCHEDULE


As seen in the docs class based tasks are a fair way to express complex logic.

However, the docs do not specify how to add your shiny newly created class based task into you CELERY_BEAT_SCHEDULE (using django)

Thing I tried: celery.py

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, 'task_summary')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from payments.tasks.generic.payeer import PayeerPaymentChecker
    from payments.tasks.generic.ok_pay import OkPayPaymentChecker

    okpay_import = OkPayPaymentChecker()
    payeer_imprt = PayeerPaymentChecker()

    sender.add_periodic_task(60.0, okpay_import.s(),
                             name='OkPay import',
                             expires=30)

    sender.add_periodic_task(60.0, payeer_imprt.s(),
                             name='Payeer import',
                             expires=30)

-- OR --

payments/task_summary.py

from tasks.generic.import import OkPayPaymentChecker, PayeerPaymentChecker
 run_okpay = OkPayPaymentChecker()
 run_payeer = PayeerPaymentChecker()

CELERY_BEAT_SCHEDULE = {
# yes, i did try referring to the class here
'check_okpay_payments': {

    'task': 'payments.tasks.task_summary.run_okpay',
    'schedule': timedelta(seconds=60),
},
'check_payeer_payments': {
    'task': 'payments.task_summary.run_payeer',
    'schedule': timedelta(seconds=60),
},
}

Really don't know what to do, restoring to something like: payments/task_summary.py

from payments.tasks.generic.ok_pay import OkPayPaymentChecker
from payments.tasks.generic.payeer import PayeerPaymentChecker
from celery import shared_task


@shared_task
def run_payer():
    instance = PayeerPaymentChecker()
    return instance.run()


@shared_task
def run_okpay():
    instance = OkPayPaymentChecker()
    return instance.run()

Online Resources which I've checked and do not help me / solve the problem:


Solution

  • It took me a while to find the answer to this too, and because this question is so high up on google search results I figure I'd drop it here for people who are struggling to find the answer:

    You add it just as you would a normal task but using the class name.

    CELERY_BEAT_SCHEDULE = {
        'my_task_name': {
            'task': 'mymodule.tasks.MyTaskClass',
            'schedule': timedelta(seconds=60),
    },
    

    (This is assuming you have mymodule/tasks.py with:

    from celery import Task
    
    class MyTaskClass(Task):
    
        def run(self, *args, **kwargs):
           ... stuff ...