Search code examples
celeryschedulechain

How to schedule a chain task in celery


I want to run a complex task scheduled by beat. Let us assume the default add/mul tasks are defined.

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        crontab(),
        add.s(2,3) | mul.s(2)
    )

But this will return an error in the worker:

NotImplementedError: chain is not a real task

How can I schedule a non trivial task with celery beat?


Solution

  • One way to do that is to schedule your tasks chain in beat_schedule in your celeryconfig, using link option, celery_tasks here is a module name where your tasks are defined

    from celery.schedules import crontab
    from celery import signature
    
    beat_schedule = {
        'chained': {
            'task': 'celery_tasks.add',
            'schedule': crontab(),
            'options': {
                'queue': 'default',
                'link': signature('celery_tasks.mul',
                            args=(),
                            kwargs={},
                            options={
                                'link': signature('celery_tasks.another_task', 
                                    args=(),
                                    kwargs={}, 
                                    queue='default')
                            },
                            queue='default')
                },
             'args': ()
        }
    }