Search code examples
rabbitmqcelerydjango-celery

Celery / RabbitMQ - Find out the No Acks - Unacknowledged messages


I am trying to figure out how to get information on unacknowledged messages. Where are these stored? In playing with celery inspect it seems that once a message gets acknowledged it processes through and you can follow the state. Assuming you have a results backend then you can see the results of it. But from the time you apply delay until it get's acknowledged it's in a black hole.

  1. Where are noAcks stored?
  2. How do I find out how "deep" is the noAcks list? In other words how many are there and where is my task in the list.

While not exactly germane to the problem here is what I'm working with.

from celery.app import app_or_default

app = app_or_default()
inspect = app.control.inspect()

# Now if I want "RECEIVED" jobs.. 
data = inspect.reserved()

# or "ACTIVE" jobs.. 
data = inspect.active()

# or "REVOKED" jobs.. 
data = inspect.revoked()

# or scheduled jobs.. (Assuming these are time based??)
data = inspect.scheduled()

# FILL ME IN FOR UNACK JOBS!!
# data = inspect.??

# This will never work for tasks that aren't in one of the above buckets..
pprint.pprint(inspect.query_task([tasks]))

I really appreciate your advice and help on this.


Solution

  • They are those tasks in inspect.reserved() that have 'acknowleged': False

    from celery.app import app_or_default
    
    app = app_or_default()
    inspect = app.control.inspect()
    
    # those that have been sent to a worker and are thus reserved
    # from being sent to another worker, but may or may not be acknowledged as received by that worker
    data = inspect.reserved()
    
    {'celery.tasks': [{'acknowledged': False,
                   'args': '[]',
                   'delivery_info': {'exchange': 'tasks',
                                     'priority': None,
                                     'routing_key': 'celery'},
                   'hostname': 'celery.tasks',
                   'id': '527961d4-639f-4002-9dc6-7488dd8c8ad8',
                   'kwargs': '{}',
                   'name': 'globalapp.tasks.task_loop_tick',
                   'time_start': None,
                   'worker_pid': None},
                  {'acknowledged': False,
                   'args': '[]',
                   'delivery_info': {'exchange': 'tasks',
                                     'priority': None,
                                     'routing_key': 'celery'},
                   'hostname': 'celery.tasks',
                   'id': '09d5b726-269e-48d0-8b0e-86472d795906',
                   'kwargs': '{}',
                   'name': 'globalapp.tasks.task_loop_tick',
                   'time_start': None,
                   'worker_pid': None},
                  {'acknowledged': False,
                   'args': '[]',
                   'delivery_info': {'exchange': 'tasks',
                                     'priority': None,
                                     'routing_key': 'celery'},
                   'hostname': 'celery.tasks',
                   'id': 'de6d399e-1b37-455c-af63-a68078a9cf7c',
                   'kwargs': '{}',
                   'name': 'globalapp.tasks.task_loop_tick',
                   'time_start': None,
                   'worker_pid': None}],
     'fastlane.tasks': [],
     'images.tasks': [],
     'mailer.tasks': []}