I have a novice level of experience with celery. I have written many tasks both scheduled and one off delayed jobs, but that is it.
I am running into an issue where I want to create one task that kicks off 1000s of smaller jobs to mitigate any issues that could arise with queue length and a job the could take hours to complete.
The current application relies on information from external APIs. So to speak a user links their account with another service I have integrated, and I want to update the user's information daily with changes on their external account.
I have a scheduled job like this
@app.task()
def refresh_accounts():
for account in Account.objects.all():
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()
--
What I desire is something like this
@app.task()
def kickoff_refresh():
for account in Account.objects.all()
refresh_account.delay(account_id=account.id)
@app.task()
def refresh_account(account_id=None):
account = Account.objects.get(id=account_id)
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()
one approach I was thinking of was having kickoff_refresh
and refresh_account
in different queues. @app.task(queue=q1)
, @app.task(queue=q2)
... I, however, do not know if there is a better way of doing this. Calling a task inside a task on the same queue seems to be bad practice in celery - https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks the task kickoff_refresh
would be a periodic task running every several hours.
I would love to hear what works for others. Thanks
from celery import group
@app.task()
def kickoff_refresh(account_id=None):
job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()
@app.task()
def refresh_account(account_id=None):
account = Account.objects.get(id=account_id)
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()