Search code examples
pythondjangodjango-queryset

Annotate django queryset based on related field attributes


Suppose you have this models structure in a django project

from django.db import models

class Object(models.Model):
    name = models.CharField(max_length=100)


class ObjectEvent(models.Model):
    class EventTypes(models.IntegerChoices):
        CREATED = 1, "Created"
        SCHEDULED = 2, "Scheduled"
        COMPLETED = 3, "Completed"
        CANCELED = 4, "Canceled"

    event_type = models.IntegerField(choices=EventTypes.choices)
    object = models.ForeignKey(Object, on_delete=models.CASCADE, related_name="events")

I now want to access the derived property ended which is defined as every object that have an event of type COMPLETED or CANCELED. I already did that with the @property decorator, but as I want to be able to filter using ended attribute. So I'm trying to implement this through the annotate queryset method.

class ObjectManager(models.Manager):
        def get_queryset(self):
            qs = super().get_queryset()
            qs = qs.annotate(
                ended=models.Case(
                    models.When(
                        events__event_type__in=(
                            ObjectEvent.EventTypes.COMPLETED,
                            ObjectEvent.EventTypes.CANCELED,
                        ),
                        then=models.Value(True),
                    ),
                    default=models.Value(False),
                    output_field=models.BooleanField(
                        verbose_name="Object ended",
                    ),
                )
            )
            return qs
class Object(models.Model):
    objects = ObjectManager()
    name = models.CharField(max_length=100)

fake_data.json

[
  { "model": "main.object", "pk": 1, "fields": { "name": "task1" } },
  { "model": "main.object", "pk": 2, "fields": { "name": "task2" } },
  { "model": "main.object", "pk": 3, "fields": { "name": "task3" } },
  { "model": "main.object", "pk": 4, "fields": { "name": "task4" } },
  {
    "model": "main.objectevent",
    "pk": 1,
    "fields": { "event_type": 1, "object": 1 }
  },
  {
    "model": "main.objectevent",
    "pk": 2,
    "fields": { "event_type": 2, "object": 1 }
  },
  {
    "model": "main.objectevent",
    "pk": 3,
    "fields": { "event_type": 4, "object": 1 }
  },
  {
    "model": "main.objectevent",
    "pk": 4,
    "fields": { "event_type": 1, "object": 2 }
  },
  {
    "model": "main.objectevent",
    "pk": 5,
    "fields": { "event_type": 1, "object": 3 }
  },
  {
    "model": "main.objectevent",
    "pk": 6,
    "fields": { "event_type": 2, "object": 3 }
  },
  {
    "model": "main.objectevent",
    "pk": 7,
    "fields": { "event_type": 3, "object": 3 }
  },
  {
    "model": "main.objectevent",
    "pk": 8,
    "fields": { "event_type": 1, "object": 4 }
  },
  {
    "model": "main.objectevent",
    "pk": 9,
    "fields": { "event_type": 2, "object": 4 }
  }
]

Now, trying with this fake data I have a strange result in manage.py shell

>>> ended_objects = Object.objects.filter(ended=True)
>>> ended_objects.count()
2 # this is fine
>>> not_ended_objects = Object.objects.filter(ended=False)
>>> not_ended_objects.count()
7 # why?
>>> not_ended_objects.distinct().count()
4 # Event using distinct doesn't resolve the problem

What am I missing?


Solution

  • You should try this for model.py

    
        from django.db import models
        from django.db.models import OuterRef, Subquery, Exists
            
        class ObjectManager(models.Manager):
            def get_queryset(self):
                qs = super().get_queryset()
        
                completed_or_canceled_events = ObjectEvent.objects.filter(
                    object=OuterRef('pk'),
                    event_type__in=[
                        ObjectEvent.EventTypes.COMPLETED,
                        ObjectEvent.EventTypes.CANCELED
                    ]
                )
        
                qs = qs.annotate(
                    ended=Exists(completed_or_canceled_events)
                )
                return qs
        
        class Object(models.Model):
            name = models.CharField(max_length=100)
            objects = ObjectManager()