Search code examples
djangocroncelerybackenddjango-celery

Celery task is not getting registered?


I'm using Django + Celery to schedule tasks dynamically. The time for scheduling tasks will be queried from the user database or will be provided by the user dynamically. I'm trying to pass an argument to start_task function but it breaks the code because the scheduler can't add this to its Entry. It works fine without the additional argument (schedule_start_time). I think its something related to signal!!

def clean_time(time):
    time = time.get("created_at")
    hh = time.strftime("%H")
    mm = time.strftime("%M")
    ss = time.strftime("%S")
    dd = time.strftime("%d")
    mnth = time.strftime("%m")
    yy = time.strftime("%Y")
    return hh, mm, ss, dd, mnth, yy


@app.on_after_configure.connect
def start_time(schedule_start_date, **kwargs):
    hh, mm, ss, dd, mnth, yy = clean_time(schedule_start_date)
    app.add_periodic_task(crontab((hour='{}'.format(hh), minute='{}'.format(mm), day_of_month='{}'.format(dd), month_of_year='{}'.format(mnth)), test.s("hello"))


@app.task
def test(arg):
    print(arg)

Solution

  • H3ll0.

    You should use django-celery-beat to manage periodic celery tasks in django.

    This is a repo I created to test out django-celery-beat.

    Periodicall Repo

    Remember you should run the worker and the beat like is described in the documentation.

    django-celery-beat-documentation