Search code examples
celerydjango-celery

How do I find task ids for Celery task events based on the arguments passed to them?


I'm stuck with something I imagine to be simple. I am trying to determine if I can find all task ids given a known set of arguments. Is this possible with Celery 4.4's API or should I have to code my own interface to the results table in the Django ORM and search myself?

My use case is one I imagine might be a common one. I have tasks that are scheduled to create Activity and Notification objects for feeds and user profiles. These are set to retry until successful since I need to ensure these objects are created.

However under high load where asynchronous tasks may fall behind, it is possible for a user to do something like post a comment and then delete it before the activity or notification objects get created. A situation like this would require the tasks to be cancelled or be re-queued forever.

The obvious solution to me was to call revoke() on the task ids that had the object id in their arguments from the post_delete signal after retrieving a list of task ids from the events api, however I've not been able to clearly find an interface in the api docs that will allow me to do this. I think my answer may be in celery.events.state.Task but if that is an interface for querying Tasks I'm not sure that args is searchable. My activity tasks are passed the object id and the create boolean from the Django post_save signal.

I could grab all tasks of the relevant type like project.activity.create_comment_activity but then I'd have to loop every task and unpack the args and check them, which seems horrible for scalability.

I must have missed a trick here, or I just code my own interface for the results backend and search the tasks with the Django ORM.


Solution

  • I dug through the django_celery_results module and found that it provides a TaskResult model that provides a native Django ORM interface to the result model. Thus you can search the result with TaskResult.objects.filter

    As a side note, though the Celery docs cover integrating the django_celery_results module they do not mention this TaskResult model or it's use. The model is documented in the module docs themselves.

    Example:

    from django_celery_results.models import TaskResult
    
    # Grab all tasks that have not been successful yet that have the 
    # PK for the comment object being deleted in their args
    results = TaskResult.objects.filter(
        task_args=(instance.pk, True or False),
        task_name='theden_django.activity.tasks.create_comment_activity',
    ).exclude(
        status='SUCCESS'
    )
    

    For a more complete example, this is the final post_delete handler for my Comment object, which revokes all tasks with the IDs that have been found, alongside the post_save task that creates them:

    
    from django.dispatch import receiver
    from django.db.models import signals
    from django_celery_results.models import TaskResult
    from theden_django.core import celery_app
    from theden_django.comments.models import Comment
    from theden_django.activity.tasks import create_comment_activity
    
    @receiver(signals.post_save, sender=Comment)
    def schedule_comment_activity_task(sender, instance, created, **kwargs):
        """Creates an activity when comments are created"""
        del sender
        del kwargs
        if created:
            create_comment_activity.delay(instance.pk, created)
    
    @receiver(signals.post_delete, sender=Comment)
    def revoke_comment_activity_task(sender, instance, **kwargs):
        """Ensures any pending or running tasks are revoked when a comment is deleted"""
        del sender
        del kwargs
    
        results = TaskResult.objects.filter(
            task_args=(instance.pk, True or False),
            task_name='theden_django.activity.tasks.create_comment_activity',
        ).exclude(
            status='SUCCESS'
        )
    
        for result in results:
            celery_app.control.revoke(task_id=result.task_id)
    
    

    Caveat

    This implementation does have a huge caveat, as that it will only find tasks that have been tried at least once. I still think there must be a way with the events api. One mitigation for this is to make the task cancellation a delayed task itself, which should ensure that the task is run after all tasks in the queue. For extra caution, I have set this task with a delay of 5 minutes so even if something is horribly slow on a worker the tasks should be recorded in the result backend by the time the revocation task runs.

    handlers.py:

    from django.dispatch import receiver
    from django.db.models import signals
    from django_celery_results.models import TaskResult
    from theden_django.core import celery_app
    from theden_django.comments.models import Comment
    from theden_django.activity.tasks import create_comment_activity
    
    @receiver(signals.post_save, sender=Comment)
    def schedule_comment_activity_task(sender, instance, created, **kwargs):
        """Creates an activity when comments are created"""
        del sender
        del kwargs
        if created:
            create_comment_activity.delay(instance.pk, created)
    
    
    @receiver(signals.post_delete, sender=Comment)
    def revoke_comment_activity_task(sender, instance, **kwargs):
        """Ensures any pending or running tasks are revoked when a comment is deleted"""
        del sender
        del kwargs
    
        revoke_pending_activity_tasks.delay(
            (instance.pk, True or False),
            'theden_django.activity.tasks.create_comment_activity',
            countdown=300
        )
    
    

    tasks.py:

    from __future__ import absolute_import, unicode_literals
    
    import logging
    from psycopg2 import OperationalError as psycopg2OperationalError
    from django.db import Error
    from django.db.models import ObjectDoesNotExist
    from django.db.utils import OperationalError
    from celery import shared_task
    from django_celery_results.models import TaskResult
    from theden_django.core import celery_app
    
    LOGGER = logging.getLogger()
    
    @shared_task(bind=True, max_retries=None)
    def create_comment_activity(self, comment_pk, created):
        """
        | Async task that creates an activity for a comment.
        :param `integer` comment_pk:
        :param `boolean` created:
        :return:
        """
        try:
            instance = Comment.objects.get(pk=comment_pk)
        except ObjectDoesNotExist as missing_comment_exception:
            LOGGER.error(
                'Unable to find comment %s. Rescheduling task.',
                comment_pk
            )
            self.retry(exc=missing_comment_exception, countdown=60)
        except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
            LOGGER.error(
                'Unable to create comment activity for %s due to database error. Rescheduling task.',
                comment_pk
            )
            self.retry(exc=db_conn_exception, countdown=60)
        else:
            try:
                create_activity(instance, instance.user.pk, created, 'comment')
            except ObjectDoesNotExist as missing_member_exception:
                LOGGER.error(
                    'Unable to find actor %s for activity. Rescheduling',
                    comment_pk
                )
                self.retry(exc=missing_member_exception, countdown=60)
            except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
                LOGGER.error(
                    'Unable to create comment activity for %s due to database error. Rescheduling task.',
                    comment_pk
                )
                self.retry(exc=db_conn_exception, countdown=60)
    
    
    @shared_task(bind=True, max_retries=None)
    def revoke_pending_activity_tasks(task_args, task_name):
        """
        | Async task that cancels other tasks
        :param task_args:
        :param task_name:
        :return:
        """
    
        results = TaskResult.objects.filter(
            task_args=task_args,
            task_name=task_name,
        ).exclude(
            status='SUCCESS'
        )
    
        for result in results:
            celery_app.control.revoke(task_id=result.task_id)