Search code examples
amazon-web-servicesaws-lambdaamazon-sqsretry-logicaws-sqs-fifo

Fifo-SQS lambda triggering failure handling


Our system uses Fifo SQS queues to drive lambdas. Here's from our SAM template:

  EventParserTriggeringQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600  # 14 Days (max)
      FifoQueue: true
      ContentBasedDeduplication: true
      VisibilityTimeout: 240  # Must be > EventParser Timeout
      Tags:
        - Key: "datadog"
          Value: "true"
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt EventParserDeadLetters.Arn
        maxReceiveCount: 1

  EventParser:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: lambdas/event_parser_lambda/
      Handler: event_parser.lambda_handler
      Timeout: 120
      Events:
        EventParserTriggeringQueueEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt EventParserTriggeringQueue.Arn
            BatchSize: 1
            ScalingConfig:
              MaximumConcurrency: 2
      Policies:
        Statement:
          - Action:
              - ssm:GetParametersByPath
              - ssm:GetParameters
              - ssm:GetParameter
            Effect: Allow
            Resource:
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/datadog/api_key"
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/sentry/dsn"
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${AWS::StackName}/*"
          - Action:
              - sqs:DeleteMessage
              - sqs:GetQueueAttributes
              - sqs:ReceiveMessage
            Effect: Allow
            Resource: !GetAtt EventParserTriggeringQueue.Arn

  EventParserDeadLetters:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600  # 14 Days (max)
      FifoQueue: true
      ContentBasedDeduplication: true
      Tags:
        - Key: "datadog"
          Value: "true"
        - Key: "deadletter"
          Value: "true"

What I'm looking for is retry behavior that looks like:
  • If a lambda fails, it gets to retry immediately
  • If a lambda fails more than the maximum allowed failure count, its message goes on a dead-letter queue immediately and the next message can be tried immediately.
Instead, the behavior we're seeing is:
  • If a lambda fails, it is retried only after the visibility timeout period. This period is necessarily longer than the lambda's typical runtime, so a lot of delay is imposed here.
  • If a lambda fails more than the maximum allowed failure count, the message only goes on a dead-letter queue after the visibility timeout period.
First, let me check my understanding of how the system works, because it's not really documented in any one place:
  • For an SQS-driven lambda, the lambda runtime calls ReceiveMessage on the SQS queue periodically. From our system, it looks like the default is once every 10 seconds.
  • If there's a message available, the queue returns it.
  • When the queue returns a message, it starts the clock on the visibility timeout.
    • Until the visibility timeout has elapsed, ReceiveMessage calls to the queue (for the same message group ID) come back empty. (This is a Fifo SQS feature. For non-FIFO queues, only the received messages are hidden.)
    • When the visibility timeout has elapsed, if the head message has been received at least the queue's maxReceiveCount, the queue gives up on the message, optionally placing it on a dead-letter queue.
  • The lambda runtime passes the message along to the lambda function.
  • If the function succeeds, the runtime calls DeleteMessage on the queue. This removes the head message, and also makes the next message available (i.e. it clears the visibility timeout).
  • If the message fails, the runtime carries on as though nothing has happened:
    • It polls the queue periodically, meaning it gets empty responses to ReceiveMessage until the visibility timeout has elapsed
    • Once the visibility timeout is passed, the queue returns the same message again. Or, if the message has been received at least its "max receive count," the queue will return the next message.
One solution I have considered:

Basically, put the lambda in charge:

  • Put retry logic in a loop in the lambda
  • If the lambda gets through its loop without a success, have it explicitly enqueue the message to an SQS queue that we'll use for dead letters. This queue wouldn't be configured as a DLQ, only we'd use it that way.
  • The lambda always returns successfully, so the lambda runtime always deletes the message from the Fifo input queue.
Is this the best I can do?

One serious issue with this approach is, lambda functions can't run longer than 15 minutes and I do worry that retrying 5 times could put us at risk.


Solution

  • This answer is python-specific but hopefully will be easy enough to translate to other implementations.

    Broadly, yes, the lambda has to take responsibility for the queue handling when there are failures.

    I wrote the following decorator, which I attach to all the entry point functions for our SQS-triggered lambdas:

    def sqs_triggered(func) -> Callable:
        """
        Decorator function for lambdas invoked by SQS messages.
    
        - Because SQS events can be batched, this decorator will invoke the function once per record.
        - If the function fails, this decorator will adjust the visibility timeout of the failed records to 0. If
          there were any successful records, it will delete them from the queue before propagating the error.
        """
    
        def decorator(event, *args, **kwargs):
            for i, record in enumerate(event["Records"]):
                try:
                    func(json.loads(record["body"]), *args, **kwargs)
                except Exception as e:
                    if "eventSourceARN" in record:
                        queue_name = record["eventSourceARN"].split(":")[-1]
                        queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
                        for successful_record in event["Records"][:i]:
                            sqs_client.delete_message(
                                QueueUrl=queue_url,
                                ReceiptHandle=successful_record["receiptHandle"],
                            )
                        for failed_record in event["Records"][i:]:
                            sqs_client.change_message_visibility(
                                QueueUrl=queue_url,
                                ReceiptHandle=failed_record["receiptHandle"],
                                VisibilityTimeout=0,
                            )
                    raise e
    
        return decorator
    
    

    Note that this means the lambda will need some extra permissions. If you're using CloudFormation, that looks like:

    Resources:
      FooBar:
        Type: AWS::Serverless::Function
        ...
        Properties:
          Policies:
            Statement:
              - Action:
                  - sqs:ChangeMessageVisibility
                  - sqs:GetQueueUrl
                Effect: Allow
                Resource: !GetAtt FooBarQueue.Arn