Search code examples
pythonpython-3.xcelery

How to generate custom app metrics/monitoring for celery and prometheus


I'm trying to generate custom application metrics in celery, and pull them into prometheus. I'm using danihodovic/celery-exporter to export the celery task metrics, and those are working out of the box. However I cannot find a way to generate custom application metrics (e.g. count of internal method calls, latencies of methods, etc.). I've used the standard python prometheus client successfully in a flask application, but I can't make it work in celery.

I can see that celery monitoring is tied with celery events, however publishing custom events does not show any new metrics in the prometheus exporter.

Code:

#!/usr/bin/env python3
import os

from celery import Celery
from celery.events import EventDispatcher
from celery.utils.log import get_task_logger
from kombu import Queue
from prometheus_client import Counter

QUEUE_INPUT = 'test_queue'
TASK_NAME = 'test_task'
QUEUE_ARGS = {'x-queue-mode': 'lazy'}
QUEUE_CONN_STR = os.environ['RMQ_MASTER_CONN_STR']

__LOG = get_task_logger(__name__)

# Define our consumer object
app = Celery(TASK_NAME)

# Define routes
app.conf.update({
    'broker_url': "{0}".format(QUEUE_CONN_STR),
    'task_routes': {"test_celery.test_task": {"queue": QUEUE_INPUT}},
    'task_serializer': 'json',
    'task_send_sent_event': True,
    'worker_send_task_events': True,
    'result_serializer': 'json',
    "broker_pool_limit": 20,
    "task_always_eager": False,
    "result_expires": 2,
    'worker_enable_remote_control': False,
    'worker_prefetch_multiplier': 1
})

app.conf.task_queues = [
    Queue(QUEUE_INPUT, queue_arguments=QUEUE_ARGS),
]

c = Counter('test_counter', 'Number of hits')
d = Counter('test_counter_1', 'Number of msgs')
dispatcher = EventDispatcher(app.connection())
print("Hi")
dispatcher.send("C_INIT")
c.inc()


@app.task(name="test_task", bind=True, max_retries=2)
def test_task(self, payload: dict):
    d.inc()
    dispatcher.send("START_PROCESSING")
    print(payload)
    dispatcher.send("END_PROCESSING")
    

Solution

  • I resorted to using the prometheus push gateway. Still searching for a better metrics framework for celery.