Search code examples
pythondjangoauthenticationdjango-guardian

Can django-guardian and django-rules be used together?


I'd like to be able to create per-object permissions using django-guardian.

But I'd like to add a layer of logic surrounding these permissions. For example if someone has edit_book permission on a Book, then their permission to edit Pages in that book should be implicit. The rules package seems ideal.


Solution

  • tl;dr Yes they can, and we can address some of the scalability issues of Rules, but you can't get round running a query per object so permissions-filtered queries are expensive. A hybrid but more complex solution is suggested at the bottom that gets round this by compiling lazy rule sets into SQL at run time, using lazy Q-like objects.

    The following appears to work:

    import rules
    import guardian
    
    @rules.predicate
    def is_page_book_editor(user, page):
        return user.has_perm('books.edit_book', page.book)
    
    @rules.predicate
    def is_page_editor(user, page):
        return user.has_perm('pages.edit_page', page)
    
    rules.add_perm('pages.can_edit_page', is_page_book_editor | is_page_editor)
    

    Then to check:

    joe.has_perm('pages.can_edit_page', page34)
    

    Or:

    @permission_required('pages.can_edit_page', fn=objectgetter(Page, 'page_id'))
    def post_update(request, page_id):
        # ...
    

    With the authentication backend defined:

    AUTHENTICATION_BACKENDS = (
        'rules.permissions.ObjectPermissionBackend',
        'django.contrib.auth.backends.ModelBackend',
        'guardian.backends.ObjectPermissionBackend',
    )
    

    The imports:

    from django.contrib.auth.models import User
    import rules
    import guardian
    from guardian.shortcuts import assign_perm
    from myapp.models import Book, Page
    

    The tests:

    joe = User.objects.create(username='joe', email='[email protected]')
    page23 = Page.objects.filter(id=123)
    assign_perm('edit_page', joe, page23)
    joe.has_perm('edit_page', page23)
    is_page_editor(joe, page23)  # returns True
    joe.has_perm('can_edit_page', i)  # returns True
    
    rules.remove_perm('can_edit_page')
    rules.add_perm('can_edit_page', is_page_book_editor & is_page_editor)
    joe.has_perm('can_edit_page', i)  # returns False
    

    A problem with this is that each time a rule is checked, each predicate makes a call to the database. The following adds caching so that there is only one query for each rule check:

    @rules.predicate
    def is_page_book_viewer(user, instance):
        if is_page_book_viewer.context.get('user_perms') is None:
            is_page_book_viewer.context['user_perms'] = guardian.shortcuts.get_perms(user, page.book)
        return 'view_book' in is_page_book_viewer.context.get('user_perms')
    
    @rules.predicate(bind=True)
    def is_page_viewer(self, user, instance):
        if self.context.get('user_perms') is None:
            self.context['user_perms'] = guardian.shortcuts.get_perms(user, instance)
        return 'view_page' in self.context.get('user_perms')
    

    (I bind in the second example and use self, but this is identical to using the predicate name.)


    As you're doing complex, composite permissions, it is probably wise to replace django-guardian's generic foreign keys with real ones that can be optimized and indexed by the database like so:

    class PageUserObjectPermission(UserObjectPermissionBase):
        content_object = models.ForeignKey(Page)
    
    class PageGroupObjectPermission(GroupObjectPermissionBase):
        content_object = models.ForeignKey(Page)
    
    class BookUserObjectPermission(UserObjectPermissionBase):
        content_object = models.ForeignKey(Book)
    
    class BookGroupObjectPermission(GroupObjectPermissionBase):
        content_object = models.ForeignKey(Book)
    

    There is a bug. We're caching permissions on Page and Book in the same place - we need to distinguish and cache these separately. Also, let's encapsulate the repeated code into its own method. Finally, let's give get() a default to make sure we don't re-query a user's permissions when they have None.

    def cache_permissions(predicate, user, instance):
        """
        Cache all permissions this user has on this instance, for potential reuse by other predicates in this rule check.
        """
        key = 'user_%s_perms_%s_%s' % (user.pk, type(instance).__name__, instance.pk)
        if predicate.context.get(key, -1) == -1:
            predicate.context[key] = guardian.shortcuts.get_perms(user, instance)
        return predicate.context[key]
    

    This way object permissions will be cached separately. (Including user id in key is unnecessary as any rule will only check one user, but is a little more future-proof.)

    Then we can define our predicates as follows:

    @rules.predicate(bind=True)
    def is_page_book_viewer(self, user, instance: Page):
        return 'view_book' in cache_permissions(self, user, instance.book)
    

    One limitation of rules is permission checks have to be done individually based on user, but we often have to get all objects a user has a given permission on. For example to get a list of all pages the user has edit permissions on I need to repeatedly call [p for p in Pages.objects.all() if usr.has_perm('can_edit_page', p)], rather than usr.has_perm('can_edit_page') returning all permitted objects in one query.

    We can't fully address this limitation, but where we don't need to check every object in a list, we can reduce the number of queries using next and lazy generator coroutine-based querysets. In the above example we could use (...) rather than [...] if we may not go to the end of the list, and next(...) if we only need to check whether any object in the list has the permission. break or return would be the equivalents in normal looping code, as below.

    I have a situation where a Model has a self-join hierarchy, and I just need to know if any of a model's descendants has a permission. The code must recursively query the table with successive nodes' descendants. But as soon as we find an object with the permission, we needn't query any further. I have done this as follows. (Note I am interested in whether anyone has the permission on an object, and I've specified non-generic keys. If you're checking the permission for a specific user you can use user.has_perm('perm_name', obj) to use your rules.)

    class Foo(models.Model):
        parent = models.ForeignKey('Foo', blank=True, null=True)
    
        def descendants(self):
            """
            When callers don't need the complete list (eg, checking if any dependent is 
            viewable by any user), we run fewer queries by only going into the dependent 
            hierarchy as much as necessary.
            """
            immediate_descendants = Foo.objects.filter(parent=self)
            for x in immediate_descendants:
                yield x
            for x in immediate_descendants:
                for y in x.descendants():
                    yield y
    
        def obj_or_descendant_has_perm(self, perm_code):
            perm_id = Permission.objects.get(codename=perm_code).id
    
            if FooUserObjectPermission.objects.filter(permission_id=perm_id,
                                                      content_object=self).exists()
                return True
            if FooGroupObjectPermission.objects.filter(permission_id=perm_id,
                                                       content_object=self).exists()
                return True
    
            for o in self.descendants():
                if FooUserObjectPermission.objects.filter(permission_id=perm_id,
                                                          content_object=self).exists()
                    return True
                if FooGroupObjectPermission.objects.filter(permission_id=perm_id,
                                                           content_object=self).exists()
                    return True
    
            return False
    

    If you have a self-join that is this simple, check out treebeard for more efficient ways of modelling hierarchies (materialized paths, nested sets or adjacency lists). In my case the self-join was via other tables so this wasn't possible.

    I went a step further and permitted group selects by returning querysets from descendants:

    class Foo(models.Model):
        parent = models.ForeignKey('Foo', blank=True, null=True)
    
        def descendants(self):
            """
            When callers don't need the complete list (eg, checking if any dependent is 
            viewable by any user), we run fewer queries by only going into the dependent 
            hierarchy as much as necessary. Returns a generator of querysets of Foo objects.
            """
            immediate_descendants = Foo.objects.filter(parent=self)
            yield immediate_descendants
            for x in immediate_descendants:
                for y in x.descendants():
                    yield y
    
        def obj_or_descendant_has_perm(self, perm_code):
            perm_id = Permission.objects.get(codename=perm_code).id
    
            if FooUserObjectPermission.objects.filter(permission_id=perm_id,
                                                      content_object=self).exists()
                return True
            if FooGroupObjectPermission.objects.filter(permission_id=perm_id,
                                                       content_object=self).exists()
                return True
    
            for gen in self.descendants():
                if FooUserObjectPermission.objects.filter(permission_id=perm_id,
                                                          content_object__in=gen).exists()
                    return True
                if FooGroupObjectPermission.objects.filter(permission_id=perm_id,
                                                           content_object__in=gen).exists()
                    return True
    
            return False
    

    However unfortunately, you can only prefetch and cache per-object, not across all objects in a query, because django-guardian generates a SQL sub-query for the Rules layer checks as each object is checked, which you can't pre-cache. Also, having both permission backends registered in Django mean both are checked every time, which could lead to unexpected approvals if a name is duplicated.

    So I went for something like Rules, but that lazily compiles into a Queryset filter on execution. Here's a contrived example to demonstrate boolean combination into rules of Guardian permissions, Django Q objects and lazily evaluated Q objects:

    add_rule('kimsim_app.model_run.view',
        LazyGuardianPermission('kimsim_app.view_model') &
        (
            LazyGuardianPermission('kimsim_app.saved_model') |
            LazyGuardianPermission('kimsim_app.saved_model')
        )
        & ~LazyQ('modelgroupobjectpermission__group__user', 'request.user')
        & ~Q(number_of_failures__lte=42)
    )
    lazy = LazyPermission('kimsim_app.model_run.view') & ~LazyGuardianPermission('kimsim_app.view_model')
    

    Then, to get all 'Models' that a user has access to:

    Model.objects.filter(lazy.convert_to_q(user=u))
    

    To work out whether a user has access to Model m (returns m if so, or None if not):

    Model.objects.filter(lazy.convert_to_q(user=u, obj=m))
    

    This can then be built into a permission backend for DRF, Django admin, template tags, etc. The code:

    class BaseLazyQ(Q):
        """
        Is the type instantiated by the Q library when it parses the operators linking Q, LazyQ, LazyPermission and other
        BaseLazyQ subclasses, and generates the 'lazy' Q node tree for a rule.
        """
        def __init__(self, *args, **kwargs):
            # If no args, this is a connector node joining two sub-clauses
            # Or if args[0] is not a string, this is a standard, fully declared Q object
            super(BaseLazyQ, self).__init__(*args, **kwargs)
            logging.info('Instantiated fully declared BaseLazyQ %s, children %s', self.connector, self.children)
    
        # These two overrides force the connector nodes to be of type `BaseLazyQ`, ie, `LazyQ() & LazyGuardian()`
        # creates a BaseLazyQ connector node. These are then simply cloned on calling `convert_to_q()`.
        # Sub-classes do different conversion processing in `convert_to_q`.
    
        # They also disable `squash`, as child Q objects have not been instantiated yet: 'lazy' Q or
        # guardian permissions are still awaiting values request & obj.
    
        def _combine(self, other, conn):
            if not isinstance(other, Q):
                raise TypeError(other)
            obj = BaseLazyQ()
            obj.connector = conn
            obj.add(self, conn, squash=False)
            obj.add(other, conn, squash=False)
            return obj
    
        def __invert__(self):
            obj = BaseLazyQ()
            obj.add(self, self.AND, squash=False)
            obj.negate()
            return obj
    
        def convert_to_q(self, *args, **kwargs):
            """
            Generates a tree of fully specified Q() objects at run time from our tree of lazy Q, Guardian and LazyPermission
            objects, by passing them the ``request`` and ``obj`` objects of the current request.
    
            Note that only kwargs ``request`` or ``obj`` can be used if you will integrate with Django ModelAdmin and
            django-restframework permissions classes.
    
            :param request: From the current request
            :param obj: Optional - the object permissions are being tested for, if this is object-specific.
            :return: A tree of Q() objects that can be applied to a queryset of type ``obj``
            """
            logging.info('Converting fully declared BaseLazyQ conn %s children %s', self.connector, self.children)
            q = self.__class__._new_instance(children=[], connector=self.connector, negated=self.negated)
            for predicate in self.children:
                if isinstance(predicate, BaseLazyQ):
                    # Including subclasses
                    q.children.append(predicate.convert_to_q(*args, **kwargs))
                else:
                    # Q or Node
                    q.children.append(predicate.clone())
                logging.info('Cloning child Q %s', predicate)
            return q
    
    
    class AlwaysQ(BaseLazyQ):
        """
        This class is used for permissions that are always granted or denied regardless of user, request, object, etc.
        """
        def __init__(self, always_allow, *args, **kwargs):
            """
            Initializes a class which always permits or denies a particular permission. Still subject to boolean operators,
            ie, `AlwaysQ('allow') & [some failing test/s]` will refuse permission.
            Likewise `AlwaysQ('deny') | [some passing test/s]` will grant permission.
            :param always_allow: Must be set to `'allow'` to always allow, or `'deny'` to always deny.
            """
            super(AlwaysQ, self).__init__(*args, **kwargs)
            if not always_allow in ['allow', 'deny']:
                raise LazyPermDeclarationError('AlwaysQ must be declared as either \'allow\' or \'deny\'.')
            self.always_allow = always_allow
    
        def convert_to_q(self, *args, **kwargs):
            return Q(pk__isnull=not self.always_allow)
    
    
    class LazyQ(BaseLazyQ):
    
        def __init__(self, *args, **kwargs):
            super(LazyQ, self).__init__(*args, **kwargs)
            if args and len(args) == 2 and isinstance(args[0], str) and isinstance(args[1], str):
                logging.info('Instantiating LazyQ %s %s', args[0], args[1])
                self.field = args[0]
                attrs = args[1].split('.')
                self.parameter = attrs[0]
                self.attributes = attrs[1:]
            else:
                raise LazyPermDeclarationError('LazyQ must be declared with a Q query string and the naming of the '
                                            'parameter attributes to assign it.')
    
        def convert_to_q(self, *args, **kwargs):
            """
            Generates a tree of fully specified Q() objects at run time, from our tree of lazy LazyPermission() and LazyQ()
            objects, by passing them the ``request`` and ``obj`` objects of the current request.
    
            Note that only kwargs ``request`` or ``obj`` can be used if we are to integrate with Django ModelAdmin and
            django-restframework permissions classes.
    
            :param request: From the current request
            :param obj: Optional - the object permissions are being tested for, if this is object-specific.
            :return: A tree of Q() objects that can be applied to a queryset of type ``obj``
            """
            logging.info('Converting LazyQ conn %s children %s args %s kwargs %s', self.connector, self.children, args, kwargs)
            value = kwargs[self.parameter]
            for attr in self.attributes:
                value = getattr(value, attr)
                logging.info('attr %s = %s', attr, value)
            return Q((self.field, value))
    
    
    class LazyGuardianPermission(BaseLazyQ):
        """
        This class supports lazy guardian permissions, whose request and obj are to be passed at runtime.
        """
        def __init__(self, permission, globals_override=False, use_groups=True, related_object=None, *args, **kwargs):
            """
            Instantiates a lazy guardian permission that can later be converted to fully defined Q objects when passed
            request and (optionally) obj at request time.
    
            :param permission: The fully qualified guardian permission name, including the app label, eg, app.action_model
    
            :param globals_override: If 'allow', if the user has the permission on the model, then they have the
            permission on every object. It is not possible to disable global permissions inherited through group ownership.
    
            If 'deny', the user must have *both* the global permission, and the permission on the object. Removing the
            global permission for a user effectively removes their permission on all that model's objects.
    
            It is not possible to disable global permissions inherited through group ownership, and so only use those
            allocated to a user, ie, use_groups has no effect and is always True for global permission checks, as they are
            provided by the Django auth ModelBackend.
    
            Default False, which means global permissions are ignored.
    
            :param use_groups: If False, permissions a user has by group membership will not be considered.
            Default True, which means this check will check the permissions of groups the user is in. Note that this does
            not affect the `allow_groups` option, or checks that are not object-specific. These will always include group
            permissions, as determined by the Django auth ModelBackend.
    
            :param related_object: If the guardian permission is on a model related to the current one, this is the
            query string path from the current model to that model.
            """
            logging.info('Instantiating LazyGuardianPermission %s', permission)
            super(LazyGuardianPermission, self).__init__(*args, **kwargs)
            if isinstance(permission, str):
                perm_elems = permission.split('.')
                if len(perm_elems) == 2:
                    # This specifies a guardian permission
                    self.app_label = perm_elems[0]
                    try:
                        self.permission = Permission.objects.select_related('content_type')\
                                                            .get(content_type__app_label=perm_elems[0],
                                                                codename=perm_elems[1])
                    except Permission.DoesNotExist:
                        raise LazyPermDeclarationError('Guardian permission %s not found. LazyGuardianPermission must be '
                                                    'passed a fully qualified guardian permission, eg, '
                                                    'app.action_model. Q, LazyQ or LazyPermission objects can also be '
                                                    'used.' % permission)
                    self.related_object = related_object
                    self.use_groups = use_groups
                    self.globals_override = globals_override
                    self.model_cls = self.permission.content_type.model_class()
                else:
                    raise LazyPermDeclarationError('Guardian permission %s not found. LazyGuardianPermission must contain '
                                                'a fully qualified guardian permission, eg, app_action_model. Q, LazyQ '
                                                'or LazyPermission objects can also be used.' % permission)
            else:
                raise LazyPermDeclarationError('LazyGuardianPermission must be declared with a fully qualified guardian '
                                            'permission name, eg, app.action_model. <%s> not a valid parameter.' %
                                            str(permission))
    
        def convert_to_q(self, user, obj=None):
            """
            Generates a tree of fully specified Q() objects at run time to test this Guardian permission, by passing them
            the `request` and `obj` objects of the current request.
    
            :param user: From the current request
            :param obj: Optional - the object permissions are being tested for, if this is object-specific.
            :return: A tree of Q() objects that can be applied to a queryset of type ``obj``
            """
            logging.info('Converting LazyGuardianPermission %s%s', '~' if self.negated else '', self.permission.codename)
    
            if self.globals_override:
                has_global = user.has_perm('%s.%s' % (self.app_label, self.permission.codename))
                if has_global and self.globals_override == 'allow':
                    return Q(pk__isnull=False)
                elif not has_global and self.globals_override == 'deny':
                    return Q(pk__isnull=True)
    
            related_object_prefix = '%s__' % self.related_object if self.related_object else ''
    
            user_obj_perms_model = get_user_obj_perms_model(self.model_cls)
            group_obj_perms_model = get_group_obj_perms_model(self.model_cls)
    
            # logging.info('%s %s %s', self.model_cls, user_obj_perms_model, user_obj_perms_model.objects)
    
            if user_obj_perms_model.objects.is_generic():
                raise LazyPermDeclarationError('%s appears to be using generic foreign keys. LazyPermissions '
                                            'does not support Guardian permissions maintained via generic '
                                            'foreign keys, and insists you specify a custom table joining '
                                            'object, permission and user, for example `class '
                                            'DatasetUserObjectPermission(UserObjectPermissionBase): '
                                            'content_object = models.ForeignKey(Dataset)` and likewise '
                                            'for Groups. This is also more performant and maintains '
                                            'referential integrity.' % self.permission)
    
            user_obj_perms_model_ref = '%s%s' % (related_object_prefix,
                                                user_obj_perms_model.content_object.field.related_query_name())
    
            if obj:
                filters = (
                    Q(('%s__user' % user_obj_perms_model_ref, user)) &
                    Q(('%s__permission' % user_obj_perms_model_ref, self.permission)) &
                    Q(('%s__content_object' % user_obj_perms_model_ref, obj.pk))
                )
            else:
                filters = (
                    Q(('%s__user' % user_obj_perms_model_ref, user)) &
                    Q(('%s__permission' % user_obj_perms_model_ref, self.permission))
                )
    
            if self.use_groups:
    
                if user_obj_perms_model.objects.is_generic():
                    raise LazyPermDeclarationError('%s appears to be using generic foreign keys. LazyPermissions '
                                                'does not support Guardian permissions maintained via generic '
                                                'foreign keys, and insists you specify a custom table joining '
                                                'object, permission and user, for example `class '
                                                'DatasetGroupObjectPermission(GroupObjectPermissionBase): '
                                                'content_object = models.ForeignKey(Dataset)` and likewise '
                                                'for Users. This is also more performant and maintains '
                                                'referential integrity.' % self.permission)
    
                group_obj_perms_model_ref = '%s%s' % (related_object_prefix,
                                                    group_obj_perms_model.content_object.field.related_query_name())
    
                if obj:
                    filters |= (
                        Q(('%s__group__user' % group_obj_perms_model_ref, user)) &
                        Q(('%s__permission' % group_obj_perms_model_ref, self.permission)) &
                        Q(('%s__content_object' % group_obj_perms_model_ref, obj.pk))
                    )
                else:
                    filters |= (
                        Q(('%s__group__user' % group_obj_perms_model_ref, user)) &
                        Q(('%s__permission' % group_obj_perms_model_ref, self.permission))
                    )
    
            logging.info('Converted non-declared LazyGuardianPermission %s%s filters %s',
                        '~' if self.negated else '', self.permission.codename, filters)
    
            return Q(filters)
    
    
    class LazyPermission(BaseLazyQ):
        """
        This class supports recursive LazyPermission references, converted to lazy q or guardian checks on
        declaration then treated identically on calling.
        """
        def __init__(self, permission=None, *args, **kwargs):
            logging.info('instantiating gorm permission=%s', permission)
            super(LazyPermission, self).__init__(*args, **kwargs)
            if isinstance(permission, str):
                try:
                    # This is a recursive LazyPermission reference, so add it as a sub-tree
                    self.children.append(default_rules[permission])
                except KeyError:
                    raise LazyPermDeclarationError('%s not found in rule_set. LazyPermission must contain a fully '
                                                'qualified guardian permission, eg, app.action_model, or another '
                                                'LazyPermission\'s key.' % permission)
    
                logging.info('Instantiated LazyPermission %s as LazyGuardianPermission sub-tree %s.',
                            permission, self.children)
            else:
                raise LazyPermDeclarationError('LazyPermission must be declared with either a fully qualified guardian '
                                            'permission, eg, app.action_model, or another LazyPermission\' key.')
    
    
    class RuleSet(dict):
        def test_rule(self, name, *args, **kwargs):
            return name in self and self[name].convert_to_q(*args, **kwargs)
    
        def rule_exists(self, name):
            return name in self
    
        def add_rule(self, name, pred):
            if name in self:
                raise KeyError('A rule with name `%s` already exists' % name)
            self[name] = pred
    
        def remove_rule(self, name):
            del self[name]