Search code examples
pythondjangoconditional-statementscaseannotate

Django Annotate foreign key - user specific status to an object


There are Widget and UserStatus models.

Challenge: Get a QuerySet of widgets, annotated with unified status from user and widget.

Logic: If has user status, use that, else use the widget's status.

https://gist.github.com/Lucianovici/53c00ec62f631580dac774cfa4a1578b

class WidgetManager(models.Manager):
    def with_user_status(self, user: User = None):
        """This approach duplicates the same widget for each UserStatus"""
        return self.annotate(
            unified_status_id=Case(
                When(user_status__user=user, then='user_status__status_id'),
                default=F('status_id'),
                output_field=models.IntegerField(),
            )
        ).order_by('unified_status_id')

class Widget(models.Model):
    STATUS_NOK = 1
    STATUS_OK = 2
    STATUS_NONE = 3

    name = models.CharField(max_length=40)
    status_id = models.IntegerField(default=STATUS_NONE)

    objects = WidgetManager()

    def __str__(self):
        return f'{self.name} - status: {self.status_id}'


class UserStatus(models.Model):
    user = models.ForeignKey(to=User, on_delete=models.CASCADE)
    widget = models.ForeignKey(to=Widget, on_delete=models.CASCADE, related_name='user_status')
    status_id = models.IntegerField(default=Widget.STATUS_NONE)

    def __str__(self):
        return f'{self.widget.name} - status: {self.status_id}'

Let's try it out.

u1 = User.objects.filter(username='user1').first() or User.objects.create_user('user1')
u2 = User.objects.filter(username='user2').first() or User.objects.create_user('user2')

w1 = Widget.objects.create(name='Widget1', status_id=Widget.STATUS_NONE)
w2 = Widget.objects.create(name='Widget2', status_id=Widget.STATUS_OK)
w3 = Widget.objects.create(name='Widget3', status_id=Widget.STATUS_NOK)

UserStatus.objects.create(user=u1, widget=w1, status_id=Widget.STATUS_NOK)
UserStatus.objects.create(user=u2, widget=w1, status_id=Widget.STATUS_OK)

qs = Widget.objects.all()
print(f'All widgets: {qs}')

qs = Widget.objects.with_user_status(user=u1)
print(f'Widgets with user status {[(w.name, w.unified_status_id) for w in qs]}')

I get:

All widgets: <QuerySet [<Widget: Widget1 - status: 3>, <Widget: Widget2 - status: 2>, <Widget: Widget3 - status: 1>]>
Widgets with user status [('Widget1', 1), ('Widget3', 1), ('Widget2', 2), ('Widget1', 3)]

Expected result:

Widgets with user status [('Widget1', 1), ('Widget3', 1), ('Widget2', 2)]

I also tried .distinct() at the end of the QuerySet, but without any luck. Thank you!


Solution

  • Well, I have a solution, but I'm not proud of it :) I think it can be done better than this.

    class WidgetManager(models.Manager):
        def with_user_status(self, user: User = None):
            return self.annotate(
                user_id=F('user_status__user'),
                unified_status_id=Case(
                    When(user_status__user=user, then='user_status__status_id'),
                    default=F('status_id'),
                    output_field=models.IntegerField(),
                )
            ).exclude(
                Q(user_id__isnull=False) & ~Q(user_id=user.id)
            ).order_by('unified_status_id')
    

    What I did, was to exclude all the other widgets that have different user status that the given one.

    qs = Widget.objects.with_user_status(user=u1)
    print(f'Widgets with user status {[(w.name, w.unified_status_id, w.user_id) for w in qs]}')
    

    Outputs:

    Widgets with user status [('Widget1', 1, 428), ('Widget3', 1, None), ('Widget2', 2, None)]
    

    Where 428 is my u1 user id.

    SQL:

    SELECT "core_widget"."id",
           "core_widget"."name",
           "core_widget"."status_id",
           "core_userstatus"."user_id"            AS "user_id",
           CASE
               WHEN "core_userstatus"."user_id" = 428 THEN "core_userstatus"."status_id"
               ELSE "core_widget"."status_id" END AS "unified_status_id"
    FROM "core_widget"
             LEFT OUTER JOIN "core_userstatus" ON ("core_widget"."id" = "core_userstatus"."widget_id")
    WHERE NOT ("core_userstatus"."user_id" IS NOT NULL AND NOT ("core_userstatus"."user_id" = 428))
    ORDER BY "unified_status_id" ASC