The below lambda function executes successfully when there's a message in SQS queue but there's no CloudWatch log entry against logger.info(message.body)
. The last CloudWatch log entry is against logger.info(msg_queue.url)
where it correctly prints the queue URL. But nothing after that. It does not even start the stepfunction. This function runs fine when executing from my local machine. The Lambda Executer Role has full access to SQS and StepFunction. What am I missing here? TIA
Ref Links: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.start_execution
import boto3
import hashlib
import logging
import uuid
logger = logging.getLogger("Queue_Listener")
logger.setLevel(logging.INFO)
ssm_client = boto3.client('ssm')
stepfn_client = boto3.client('stepfunctions')
sqs_client = boto3.resource('sqs')
def lambda_handler(event, context):
step_func = ssm_client.get_parameter(Name='/stepfunctions/test-step-function')
step_func_arn = step_func['Parameter']['Value']
logger.info(step_func_arn)
msg_queue = sqs_client.get_queue_by_name(QueueName='test-queue.fifo')
logger.info(msg_queue.url)
for message in msg_queue.receive_messages(MessageAttributeNames=['All'],
ReceiveRequestAttemptId=str(uuid.uuid4()),
WaitTimeSeconds=20):
logger.info(message.body)
# checking for message body corruption
msg_hash = hashlib.md5(message.body.encode())
digest = msg_hash.hexdigest()
if digest != message.md5_of_body:
logger.error("Message body is corrupted")
if digest == message.md5_of_body:
exec_name = "stepfn-" + str(uuid.uuid4())
response = stepfn_client.start_execution(stateMachineArn=step_func_arn,
name=exec_name,
input=message.body,
traceHeader=exec_name
)
# Let the queue know that the message is processed
message.delete()
logger.info(response)
return {"execution_status":response}
At first I was confused by why, if test-queue.fifo
is what is triggering the lambda function, you wouldn't just process event
directly rather than polling the queue in the function logic. Thinking about it though, I think this pattern is your problem. Namely, here's what is happening:
When Lambda reads a batch, the messages stay in the queue but are hidden for the length of the queue's visibility timeout. If your function successfully processes the batch, Lambda deletes the messages from the queue. (AWS docs source)
You should use the already-implemented-for-you wiring between Lambda and SQS. The link above is an overview of how to do this. You can choose how many messages should be sent (at most) at a time to your lambda, and you'll get them passed to the function, and automatically removed from the queue.
If you really really really want to use your manual approach, you'll need to either change the trigger of your lambda (the idea that comes to mind is a Cloudwatch metric trigger on the number of pending messages), or change the message visibility timeout and add logic to delete events from the queue after processing and make sure your polling + processing + deleting logic plays nicely with Lambda's auto-deletion-on-success. I do not recommend this latter approach :)