Search code examples
djangoasynchronouscelery

Log results of asynchronous processes to a database field


I am working on a project using Django and Celery which takes large files and processes them in multiple steps. The main object for the file is called Sequencing and only one is created per file.

Now for every Sequencing we create many samples, variations etc. Some of these are already in the db so we use a get or create. After the processing of each Sequencing, I would like to have the amount of created and the amount of reused objects written into a json field called metadata in the Sequencing object but as some processes run asynchronously, this does not work out.

Some code excerpts:

models.py:

class Sequencing(...):
    metadata = models.JSONField(
        default=dict,
        blank=True,
    )

    def log(self, level: str, msg: str | list, *args, **kwargs) -> None:
        """Records messages for users in metadata."""
        if "log" not in self.metadata:
            self.metadata["log"] = []
        self.metadata["log"].append(
            {
                "date": datetime.datetime.utcnow().isoformat(),
                "level": level,
                "msg": msg,
            }
        )
        self.save()

    def load(self) -> None:
        chain(
            ...
            sequencing_load_variants.si(self.id),
            ...
        )

    def load_variants(self) -> None:
        ...
        for contig in contigs:
            sequencing_load_variants_by_contig.delay(self.pk, contig=contig)

    def load_variants_by_contig(self, contig: str) -> None:
        # create samples
        # create variants

        ....
        self.log(
            "info",
            [f"Variants created for region {region.seqid}: {log_dict['variants_create']}"]
            + [f"Variants found for region {region.seqid}: {log_dict['variants_get']}"]
            + [f"Samplerecords created for region {region.seqid}: {log_dict['samplerecords_create']}"]
            + ["Samplerecords found for region {region.seqid}: {log_dict['samplerecords_get']}"],
        )
}

In tasks.py:

@shared_task(bind=True, queue="computation")
def sequencing_load_variants(self, sequencing_pk: int) -> None:
    from sequencings.models import Sequencing

    sequencing_obj = Sequencing.objects.get(pk=sequencing_pk)
    sequencing_obj.load_variants()

@shared_task(bind=True, queue="computation")
def sequencing_load_variants_by_contig(self, sequencing_pk: int, contig: str) -> None:
    from sequencings.models import Sequencing

    sequencing_obj = Sequencing.objects.get(pk=sequencing_pk)
    sequencing_obj.load_variants_by_contig(contig=contig)

After the sequencing is saved, the metadata field holds the information to some of the regions but not all of them. Sometimes I can only see the logs for a single region, sometimes 2 and not always the same regions neither (I had logs for region 8, for 19 and 22, for region 9 and 22 and so on).

I think there is a collision when the asynchronous function "load_variants_by_contig" tries to call the log functions save method but this is where my understanding stops. How could I analyse, debug and fix this?


Solution

  • If anyone comes to see this, the issue was indeed a race condition. The solution was to store all results in redis and celery chain the writing to the logs after all parsing had been finished.