Search code examples
amazon-web-servicesredisceleryamazon-cloudwatchamazon-elasticache

Celery Redis backend- Make tasks in queue exist as items


Current setup: celery running on docker containers (with our product's code) on an EC2 node, creating and processing tasks. Our backend/broker is Redis, running in AWS' elasticache.

Goal: being able to see the queue size at any given time (similar to flower's monitoring), hopefully through AWS CloudWatch, but not needed. The content of the tasks isn't pertinent, as I am familiar with making a backup of the redis instance, and can parse the backup using local tools to do any analysis needed. Short lived historical data is highly preferred (CloudWatch goes back 2 weeks, and has granularity of 1 min datapoints, this is quite nice).

Based on how I'm aware Flower works, Flower wouldn't be feasible to use due to the amount of security groups/restrictions that we currently have in place. Additionally flower is only monitoring while you're on the page, so there is no historical data saved.

Elasticache already has built in CloudWatch for number of items in redis. This seems to me the best route to achieve the goal. However currently the queue represents one item in redis (no matter how many tasks are in the queue). Here is a sample of the redis backup parsed to json:

[{
"1.api_data__cached_api_route.000":"{\"i1\": 0, \"i2\": 1, \"i3\": 0}",
"1.api_data__cached_api_route.001":"{\"i1\": 0, \"i2\": 0, \"i3\": 0}",
"1.api_data__cached_api_route.002":"{\"i1\": 1, \"i2\": 1, \"i3\": 0}",
"staging_event_default":["{\"id\":\"b28b056c-1268-4577-af8a-9f1948860502\", \"task\":{...}}, "{\"id\":\"52668c46-3972-457a-be3a-6e27eedd26e3\", \"task\":{...}}]
}]

Cloudwatch sees this as 4 items, the 3 cached api routes, and the 1 queue. Even if the queue would have thousands of items, it would still show as 4 items. The discrepancy between #(items in queue) and #(items in queue AND other cached items) is fine, as this monitoring tool will mainly be used to see if the queue is getting horrendously backed up, and the size of the queue will dwarf the number of cached api routes.

To continue along this route, the easiest answer would be if celery has a config option to make each item in the queue it's own redis item. If there's a simple fix or config option for this, it seems the easiest to implement. Here are our current celery config options:

flask_app.config.update(
  CELERY_BROKER_URL=redis_host,
  CELERY_RESULT_BACKEND=redis_host,
  CELERY_QUEUES=queue_manager.queues,
  CELERY_ROUTES=queue_manager.routes,
  CELERY_DEFAULT_QUEUE=queue_manager.default_queue_name,
  CELERY_DEFAULT_EXCHANGE=queue_manager.default_exchange_name)

_celery = celery.Celery(flask_app.import_name,
  backend=flask_app.config['CELERY_RESULT_BACKEND'],
  broker=flask_app.config['CELERY_BROKER_URL'])

opts = {
  'broker_url': redis_host,
  'result_backed': redis_host,
  'task_queues': queue_manager.queues,
  'task_routes': queue_manager.routes,
  'task_default_queue': queue_manager.default_queue_name,
  'task_default_exchange': queue_manager.default_exchange_name,
  'worker_send_task_events': True,

  'task_ignore_result': True,
  'task_store_errors_even_if_ignored': True,
  'result_expires': 3600,

  'worker_redirect_stdouts': False,
  'beat_scheduler': redbeat.RedBeatScheduler,
  'beat_max_loop_interval': 5
}
_celery.conf.update(opts)

Another option I've run across is celery-cloudwatch-logs, which seems to be along the lines of what I'm trying to achieve, however seems to be more aimed at seeing the particular content of each task, and not in aggregate (however I could be wrong there).

If there is no perfect/easy solution that meets the goal, I'll look into forking/building off of celery-cloudwatch to upload just the pertinent information. Our team has inherited much of the code that currently exists, and I have a basic understanding of how celery works, but by no means in-depth.

Thanks in advance for anyone's thoughts, comments, and help!


Solution

  • I'll post what I did here if someone happens to come across it.

    We already had boto3 installed and configured for S3 access elsewhere in the application, making it quite easy to post to CloudWatch.

    I added a method to our Celery class to check the length of the queues using the llen from the redis module:

     @staticmethod
      def check_lengths():
        result = {}
        for q in Celery._queues:
          result[q] = Celery._redis.llen(q)
        return result
    

    Then posting to Cloudwatch was fairly easy as well:

        namespace = "Celery/Queue"
        metrics = []
        for qname, qlen in data.items():
          metric = {}
          metric["MetricName"] = "ItemsInQueue"
          metric["Dimensions"] = [ {"Name": "QueueName", "Value": qname} ]
          metric["Value"] = qlen
          metric["Unit"] = "Count"
    
          metrics.append(metric)
    
        self.cw_client.put_metric_data(Namespace=namespace, MetricData=metrics)
    

    I then ended up using AWS Lambda to send a network request on the minute to an endpoint that then posted the above data to CloudWatch.