Search code examples
celerycelerybeat

Celery Add new tasks dinamically


Is it possible to schedule a task in celery from another task?

I've got this Python script:

import logging
from celery import Celery
from datetime import datetime

logger = logging.getLogger(__name__)

app = Celery('app', backend='amqp://', broker='pyamqp://guest@localhost:5672//')

@app.task()
def add(x, y):
    result = x + y
    logger.info(f'Add: {x} + {y} = {result}')
    return result

@app.task()
def setPeriodicTask():
     #option 1 
    app.add_periodic_task(10, add.s(30, 1))
     #option 2 
    app.conf.beat_schedule = {
        'add-every-5-seconds': {
            'task': 'app.add',
            'schedule': 5.0,
            'args': (now.hour, now.second)
        }
    }
    logger.info(f'setPeriodicTask succeeded')
    return 1

When I call the add task, it works OK.

If I call the setPeriodicTask task it does not throw any error but the add task is not scheduled. I've tried both options, none is working:

  1. add_periodic_task
  2. modify the beat_schedule

If I add this code to my Python script (as I've seen in the celery docs):

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5.0, add.s(10, 1))

I can see the add task running scheduled as expected. So celery and celery beat seem to be working fine. But I want to enable/disable the task on demand.

Is it possible? And if so, what am I doing wrong?


Solution

  • In case someone else faces this issue.

    I ended up using a database with a similar approach as the one mention in the django -celery-beat doc: django-celery-beat - Database-backed Periodic Tasks