I'm stuck with something I imagine to be simple. I am trying to determine if I can find all task ids given a known set of arguments. Is this possible with Celery 4.4's API or should I have to code my own interface to the results table in the Django ORM and search myself?
My use case is one I imagine might be a common one. I have tasks that are scheduled to create Activity and Notification objects for feeds and user profiles. These are set to retry until successful since I need to ensure these objects are created.
However under high load where asynchronous tasks may fall behind, it is possible for a user to do something like post a comment and then delete it before the activity or notification objects get created. A situation like this would require the tasks to be cancelled or be re-queued forever.
The obvious solution to me was to call revoke() on the task ids that had the object id in their arguments from the post_delete
signal after retrieving a list of task ids from the events api, however I've not been able to clearly find an interface in the api docs that will allow me to do this. I think my answer may be in celery.events.state.Task but if that is an interface for querying Tasks I'm not sure that args
is searchable. My activity tasks are passed the object id and the create
boolean from the Django post_save
signal.
I could grab all tasks of the relevant type like project.activity.create_comment_activity
but then I'd have to loop every task and unpack the args and check them, which seems horrible for scalability.
I must have missed a trick here, or I just code my own interface for the results backend and search the tasks with the Django ORM.
I dug through the django_celery_results
module and found that it provides a TaskResult model that provides a native Django ORM interface to the result model. Thus you can search the result with TaskResult.objects.filter
As a side note, though the Celery docs cover integrating the django_celery_results module they do not mention this TaskResult
model or it's use. The model is documented in the module docs themselves.
Example:
from django_celery_results.models import TaskResult
# Grab all tasks that have not been successful yet that have the
# PK for the comment object being deleted in their args
results = TaskResult.objects.filter(
task_args=(instance.pk, True or False),
task_name='theden_django.activity.tasks.create_comment_activity',
).exclude(
status='SUCCESS'
)
For a more complete example, this is the final post_delete
handler for my Comment object, which revokes all tasks with the IDs that have been found, alongside the post_save
task that creates them:
from django.dispatch import receiver
from django.db.models import signals
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
from theden_django.comments.models import Comment
from theden_django.activity.tasks import create_comment_activity
@receiver(signals.post_save, sender=Comment)
def schedule_comment_activity_task(sender, instance, created, **kwargs):
"""Creates an activity when comments are created"""
del sender
del kwargs
if created:
create_comment_activity.delay(instance.pk, created)
@receiver(signals.post_delete, sender=Comment)
def revoke_comment_activity_task(sender, instance, **kwargs):
"""Ensures any pending or running tasks are revoked when a comment is deleted"""
del sender
del kwargs
results = TaskResult.objects.filter(
task_args=(instance.pk, True or False),
task_name='theden_django.activity.tasks.create_comment_activity',
).exclude(
status='SUCCESS'
)
for result in results:
celery_app.control.revoke(task_id=result.task_id)
This implementation does have a huge caveat, as that it will only find tasks that have been tried at least once. I still think there must be a way with the events api. One mitigation for this is to make the task cancellation a delayed task itself, which should ensure that the task is run after all tasks in the queue. For extra caution, I have set this task with a delay of 5 minutes so even if something is horribly slow on a worker the tasks should be recorded in the result backend by the time the revocation task runs.
handlers.py
:
from django.dispatch import receiver
from django.db.models import signals
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
from theden_django.comments.models import Comment
from theden_django.activity.tasks import create_comment_activity
@receiver(signals.post_save, sender=Comment)
def schedule_comment_activity_task(sender, instance, created, **kwargs):
"""Creates an activity when comments are created"""
del sender
del kwargs
if created:
create_comment_activity.delay(instance.pk, created)
@receiver(signals.post_delete, sender=Comment)
def revoke_comment_activity_task(sender, instance, **kwargs):
"""Ensures any pending or running tasks are revoked when a comment is deleted"""
del sender
del kwargs
revoke_pending_activity_tasks.delay(
(instance.pk, True or False),
'theden_django.activity.tasks.create_comment_activity',
countdown=300
)
tasks.py
:
from __future__ import absolute_import, unicode_literals
import logging
from psycopg2 import OperationalError as psycopg2OperationalError
from django.db import Error
from django.db.models import ObjectDoesNotExist
from django.db.utils import OperationalError
from celery import shared_task
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
LOGGER = logging.getLogger()
@shared_task(bind=True, max_retries=None)
def create_comment_activity(self, comment_pk, created):
"""
| Async task that creates an activity for a comment.
:param `integer` comment_pk:
:param `boolean` created:
:return:
"""
try:
instance = Comment.objects.get(pk=comment_pk)
except ObjectDoesNotExist as missing_comment_exception:
LOGGER.error(
'Unable to find comment %s. Rescheduling task.',
comment_pk
)
self.retry(exc=missing_comment_exception, countdown=60)
except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
LOGGER.error(
'Unable to create comment activity for %s due to database error. Rescheduling task.',
comment_pk
)
self.retry(exc=db_conn_exception, countdown=60)
else:
try:
create_activity(instance, instance.user.pk, created, 'comment')
except ObjectDoesNotExist as missing_member_exception:
LOGGER.error(
'Unable to find actor %s for activity. Rescheduling',
comment_pk
)
self.retry(exc=missing_member_exception, countdown=60)
except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
LOGGER.error(
'Unable to create comment activity for %s due to database error. Rescheduling task.',
comment_pk
)
self.retry(exc=db_conn_exception, countdown=60)
@shared_task(bind=True, max_retries=None)
def revoke_pending_activity_tasks(task_args, task_name):
"""
| Async task that cancels other tasks
:param task_args:
:param task_name:
:return:
"""
results = TaskResult.objects.filter(
task_args=task_args,
task_name=task_name,
).exclude(
status='SUCCESS'
)
for result in results:
celery_app.control.revoke(task_id=result.task_id)