Search code examples
amazon-web-servicesaws-lambdaamazon-sqs

Processing AWS Lambda messages in Batches


I am wondering something, and I really can't find information about it. Maybe it is not the way to go but, I would just like to know.

It is about Lambda working in batches. I know I can set up Lambda to consume batch messages. In my Lambda function I iterate each message, and if one fails, Lambda exits. And the cycle starts again.

I am wondering about slightly different approach Let's assume I have three messages: A, B and C. I also take them in batches. Now if the message B fails (e.g. API call failed), I return message B to SQS and keep processing the message C.

Is it possible? If it is, is it a good approach? Because I see that I need to implement some extra complexity in Lambda and what not.

Thanks


Solution

  • There's an excellent article here. The relevant parts for you are...

    • Using a batchSize of 1, so that messages succeed or fail on their own.
    • Making sure your processing is idempotent, so reprocessing a message isn't harmful, outside of the extra processing cost.
    • Handle errors within your function code, perhaps by catching them and sending the message to a dead letter queue for further processing.
    • Calling the DeleteMessage API manually within your function after successfully processing a message.

    The last bullet point is how I've managed to deal with the same problem. Instead of returning errors immediately, store them or note that an error has occurred, but then continue to handle the rest of the messages in the batch. At the end of processing, return or raise an error so that the SQS -> lambda trigger knows not to delete the failed messages. All successful messages will have already been deleted by your lambda handler.

    sqs = boto3.client('sqs')
    
    def handler(event, context):
        failed = False
    
        for msg in event['Records']:
            try:
                # Do something with the message.
                handle_message(msg)
            except Exception:
                # Ok it failed, but allow the loop to finish.
                logger.exception('Failed to handle message')
                failed = True
            else:
                # The message was handled successfully. We can delete it now.
                sqs.delete_message(
                    QueueUrl=<queue_url>,
                    ReceiptHandle=msg['receiptHandle'],
                )
    
        # It doesn't matter what the error is. You just want to raise here
        # to ensure the trigger doesn't delete any of the failed messages.
        if failed:
            raise RuntimeError('Failed to process one or more messages')
    
    def handle_msg(msg):
        ...