I'm using Django + Celery to schedule tasks dynamically. The time for scheduling tasks will be queried from the user database or will be provided by the user dynamically. I'm trying to pass an argument to start_task function but it breaks the code because the scheduler can't add this to its Entry. It works fine without the additional argument (schedule_start_time). I think its something related to signal!!
def clean_time(time):
time = time.get("created_at")
hh = time.strftime("%H")
mm = time.strftime("%M")
ss = time.strftime("%S")
dd = time.strftime("%d")
mnth = time.strftime("%m")
yy = time.strftime("%Y")
return hh, mm, ss, dd, mnth, yy
@app.on_after_configure.connect
def start_time(schedule_start_date, **kwargs):
hh, mm, ss, dd, mnth, yy = clean_time(schedule_start_date)
app.add_periodic_task(crontab((hour='{}'.format(hh), minute='{}'.format(mm), day_of_month='{}'.format(dd), month_of_year='{}'.format(mnth)), test.s("hello"))
@app.task
def test(arg):
print(arg)
H3ll0.
You should use django-celery-beat to manage periodic celery tasks in django.
This is a repo I created to test out django-celery-beat.
Remember you should run the worker and the beat like is described in the documentation.