Search code examples
pythondjangocelery

Celery chord callback isn't always launched


I'm trying to use a chord to launch a report update after the update is completed.

@shared_task(autoretry_for=(Exception,), retry_backoff=True, retry_kwargs {'max_retries': 5})
def upload(df: pd.DataFrame, **kwargs):
    ed = EntityDataPoint(df, **kwargs)
    uploadtasks, source, subtype = ed.upload_database()
    chord(uploadtasks)(final_report.si(logger=ed.logger, 
                                       source=source, 
                                       subtype=subtype,
                                       index=ed.index))

With uploadtask being :

    g = group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                     for i in range(0, len(self.data), chunk_size))

When the header of the chord has more than 2 elements, the first two subtasks succeed, and the rest of the tasks in the group and the callback are not launched, without any error being sent anywhere, and without any information in the celery workers logs. After inspecting the workers, with celery inspect active, scheduled, there doesn't seem to be any waiting task in the queue.

If the header (the group) has 2 or less elements, there is no problem, the group tasks finishes, the callback is called.

It does not seem depend on the size of the elements (if each subtask in the group is sending 100 rows, we still have the same behavior for 1000 rows).

If I just launch the group tasks, without the chord and the callback, the tasks succeed without any error.

I tried using different syntaxes for the chord, and it doesn't seem to change anything.

I tried using the group.link feature to see what would happen,and the group seems to finish when doing this, but the callback doesn't happen after all the group tasks are finished ofc since it's not designed for that as I understood from the documentation, so it's not completely the behavior I want.

I'm using Celery 5.2.3 with a Redis 7.0.0 broker and a Django 3.2.13 backend with psql, with python 3.9. Everything is running on seperate docker containers.


Solution

  • It seems that using the group directly as the header of the chord was creating the problem. It was probably using the first task in the group as the header, and the second as the callback (though I can't understand why that didn't caused some error with the arguments of theses tasks). Instead of returning :

    group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                         for i in range(0, len(self.data), chunk_size))
    

    I now return :

    [unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                         for i in range(0, len(self.data), chunk_size)]
    

    And it works just as expected.