Search code examples
pythondjangocelerydjango-celerycelerybeat

Celery Best Practice For one Task Kicking off Many Small tasks


I have a novice level of experience with celery. I have written many tasks both scheduled and one off delayed jobs, but that is it.

I am running into an issue where I want to create one task that kicks off 1000s of smaller jobs to mitigate any issues that could arise with queue length and a job the could take hours to complete.

The current application relies on information from external APIs. So to speak a user links their account with another service I have integrated, and I want to update the user's information daily with changes on their external account.

I have a scheduled job like this

@app.task() 
def refresh_accounts():
    for account in Account.objects.all():
        response = retrieve_account_info(account_id=account.id)
        account.data = response.data 
        account.save() 

--

What I desire is something like this

@app.task()
def kickoff_refresh():
    for account in Account.objects.all()
        refresh_account.delay(account_id=account.id)

@app.task() 
def refresh_account(account_id=None):
    account = Account.objects.get(id=account_id)
    response = retrieve_account_info(account_id=account.id)
    account.data = response.data 
    account.save()

one approach I was thinking of was having kickoff_refresh and refresh_account in different queues. @app.task(queue=q1), @app.task(queue=q2)... I, however, do not know if there is a better way of doing this. Calling a task inside a task on the same queue seems to be bad practice in celery - https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks the task kickoff_refresh would be a periodic task running every several hours.

I would love to hear what works for others. Thanks


Solution

  • from celery import group
    
    
    @app.task()
    def kickoff_refresh(account_id=None):
        job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()
    
    @app.task()
    def refresh_account(account_id=None):
        account = Account.objects.get(id=account_id)
        response = retrieve_account_info(account_id=account.id)
        account.data = response.data 
        account.save()