Search code examples
kubernetesamazon-sqsamazon-ekshorizontalpodautoscaler

Is it possible to get total number of message in SQS?


I see there are 2 separate metrics ApproximateNumberOfMessagesVisible and ApproximateNumberOfMessagesNotVisible.

Using number of messages visible causes processing pods to get triggered for termination immediately after they pick up the message from queue, as they're no longer visible. If I use number of messages not visible, it will not scale up.

I'm trying to scale a kubernetes service using horizontal pod autoscaler and external metric from SQS. Here is template external metric:

apiVersion: metrics.aws/v1alpha1
kind: ExternalMetric
metadata:
  name: metric-name
spec:
  name: metric-name
  queries:
    - id: metric_name
      metricStat:
        metric:
          namespace: "AWS/SQS"
          metricName: "ApproximateNumberOfMessagesVisible"
          dimensions:
            - name: QueueName
              value: "queue_name"
        period: 60
        stat: Average
        unit: Count
      returnData: true

Here is HPA template:

kind: HorizontalPodAutoscaler
apiVersion: autoscaling/v2beta1
metadata:
  name: hpa-name
spec:
  scaleTargetRef:
    apiVersion: apps/v1beta1
    kind: Deployment
    name: deployment-name
  minReplicas: 1
  maxReplicas: 50
  metrics:
  - type: External
    external:
      metricName: metric-name
      targetAverageValue: 1

The problem will be solved if I can define another custom metric that is a sum of these two metrics, how else can I solve this problem?


Solution

  • We used a lambda to fetch two metrics and publish a custom metric that is sum of messages in-flight and waiting, and trigger this lambda using cloudwatch events at whatever frequency you want, https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#rules:action=create

    Here is lambda code for reference:

    const AWS = require('aws-sdk');
    const cloudwatch = new AWS.CloudWatch({region: ''});  // fill region here
    const sqs = new AWS.SQS();
    
    const SQS_URL = ''; // fill queue url here
    
    async function getSqsMetric(queueUrl) {
        var params = {
          QueueUrl: queueUrl,
          AttributeNames: ['All']
        };
        return new Promise((res, rej) => {
            sqs.getQueueAttributes(params, function(err, data) {
                if (err) rej(err);
                else res(data);
            });
        })
        
    }
    
    function buildMetric(numMessages) {
      return {
        Namespace: 'yourcompany-custom-metrics',
        MetricData: [{
          MetricName: 'mymetric',
          Dimensions: [{
              Name: 'env',
              Value: 'prod'
          }],
          Timestamp: new Date(),
          Unit: 'Count',
          Value: numMessages
        }]
      }
    }
    
    async function pushMetrics(metrics) {
      await new Promise((res) => cloudwatch.putMetricData(metrics, (err, data) => {
        if (err) {
          console.log('err', err, err.stack); // an error occurred
          res(err);
        } else {
          console.log('response', data);           // successful response
          res(data);
        }
      }));
    }
    
    exports.handler = async (event) => {
        console.log('Started');
        const sqsMetrics = await getSqsMetric(SQS_URL).catch(console.error);
        var queueSize = null;
        if (sqsMetrics) {
            console.log('Got sqsMetrics', sqsMetrics);
            if (sqsMetrics.Attributes) {
              queueSize = parseInt(sqsMetrics.Attributes.ApproximateNumberOfMessages) + parseInt(sqsMetrics.Attributes.ApproximateNumberOfMessagesNotVisible);
              console.log('Pushing', queueSize);
              await pushMetrics(buildMetric(queueSize))
            } 
        } else {
            console.log('Failed fetching sqsMetrics');
        }
        const response = {
            statusCode: 200,
            body: JSON.stringify('Pushed ' + queueSize),
        };
        return response;
    };