Search code examples
aws-lambdaboto3amazon-sqspython-3.8aws-step-functions

AWS Lambda fails to read from SQS


The below lambda function executes successfully when there's a message in SQS queue but there's no CloudWatch log entry against logger.info(message.body). The last CloudWatch log entry is against logger.info(msg_queue.url) where it correctly prints the queue URL. But nothing after that. It does not even start the stepfunction. This function runs fine when executing from my local machine. The Lambda Executer Role has full access to SQS and StepFunction. What am I missing here? TIA

Ref Links: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.start_execution

import boto3
import hashlib
import logging
import uuid


logger = logging.getLogger("Queue_Listener")
logger.setLevel(logging.INFO)


ssm_client = boto3.client('ssm')
stepfn_client = boto3.client('stepfunctions')
sqs_client = boto3.resource('sqs')


def lambda_handler(event, context):

    step_func = ssm_client.get_parameter(Name='/stepfunctions/test-step-function')
    step_func_arn = step_func['Parameter']['Value']
    logger.info(step_func_arn)

    msg_queue = sqs_client.get_queue_by_name(QueueName='test-queue.fifo')
    logger.info(msg_queue.url)
    
    for message in msg_queue.receive_messages(MessageAttributeNames=['All'],
                                                          ReceiveRequestAttemptId=str(uuid.uuid4()),
                                                          WaitTimeSeconds=20):
        logger.info(message.body)
        # checking for message body corruption
        msg_hash = hashlib.md5(message.body.encode())
        digest = msg_hash.hexdigest()

        if digest != message.md5_of_body:
            logger.error("Message body is corrupted")

        if digest == message.md5_of_body:
            exec_name = "stepfn-" + str(uuid.uuid4())
            response = stepfn_client.start_execution(stateMachineArn=step_func_arn,
                                                     name=exec_name,
                                                     input=message.body,
                                                     traceHeader=exec_name
                                                     )
            # Let the queue know that the message is processed
            message.delete()
            logger.info(response)

            return {"execution_status":response}



Solution

  • At first I was confused by why, if test-queue.fifo is what is triggering the lambda function, you wouldn't just process event directly rather than polling the queue in the function logic. Thinking about it though, I think this pattern is your problem. Namely, here's what is happening:

    1. A message arrives in the SQS queue.
    2. The message is sent to your lambda and triggers its execution. It is no longer visible in the queue, because:

    When Lambda reads a batch, the messages stay in the queue but are hidden for the length of the queue's visibility timeout. If your function successfully processes the batch, Lambda deletes the messages from the queue. (AWS docs source)

    1. The lambda polls the queue. There's nothing (visible) there. The lambda returns successfully.
    2. Because the lambda finished successfully, the event is deleted from the queue.

    You should use the already-implemented-for-you wiring between Lambda and SQS. The link above is an overview of how to do this. You can choose how many messages should be sent (at most) at a time to your lambda, and you'll get them passed to the function, and automatically removed from the queue.

    If you really really really want to use your manual approach, you'll need to either change the trigger of your lambda (the idea that comes to mind is a Cloudwatch metric trigger on the number of pending messages), or change the message visibility timeout and add logic to delete events from the queue after processing and make sure your polling + processing + deleting logic plays nicely with Lambda's auto-deletion-on-success. I do not recommend this latter approach :)