I am trying to implement a small AWS lambda function that will capture the cloudwatch logs created by my AWS batch program and look for any Errors in it and if so, send an SNS notification via email.
Somehow I am unable to get the latest batch_id since Batch ID keeps changing every time AWS Batch runs. I had this hardcoded in Environment variables of AWS lambda but it doesn't work since batchId is unique every time batch is executed.
This is my code so far.
import re
import os
import boto3
import urllib3
urllib3.disable_warnings()
logs_client = boto3.client('logs')
sns_client = boto3.client('sns')
def get_log_stream_name(batch_job_id):
response = logs_client.describe_jobs(jobs=[batch_job_id])
if response['jobs']:
container = response['jobs'][0].get('container', {})
log_stream_name = container.get('logStreamName')
return log_stream_name
else:
print(f"No job found with ID {batch_job_id}")
return None
def get_cloudwatch_logs(log_group_name, log_stream_name, start_time=None,
end_time=None, limit=100):
# Retrieve log events from the latest log stream
kwargs = {
'logGroupName': log_group_name,
'logStreamName': log_stream_name,
'limit': limit
}
if start_time:
kwargs['startTime'] = int(start_time)
if end_time:
kwargs['endTime'] = int(end_time)
response = logs_client.get_log_events(**kwargs)
return response['events']
def check_for_errors(log_events, error_keywords=None):
if error_keywords is None:
error_keywords = ["ERROR", "Exception", "FAILED"]
error_pattern = re.compile("|".join(error_keywords), re.IGNORECASE)
error_logs = [event for event in log_events if
error_pattern.search(event['message'])]
return error_logs
def send_sns_notification(topic_arn, subject, message):
# Send a notification to SNS topic
response = sns_client.publish(
TopicArn=topic_arn,
Subject=subject,
Message=message
)
return response
def lambda_handler(event, context):
# Get the latest log stream
print("event:", event)
print("context:", context)
batch_job_id = os.getenv('BATCH_JOB_ID')
print('batch_job_id:', batch_job_id)
log_group_name = os.getenv('LOG_GROUP_NAME')
print('log_group_name: ', log_group_name)
sns_topic_arn = event.getenv('SNS_TOPIC_ARN')
print('sns_topic_arn: ', sns_topic_arn)
assert batch_job_id is not None
assert log_group_name is not None
assert sns_topic_arn is not None
log_stream_name = get_log_stream_name(batch_job_id)
assert log_stream_name is not None
log_events = get_cloudwatch_logs(log_group_name=log_group_name,
log_stream_name=log_stream_name)
assert log_events is not None
# Check for errors in logs
error_logs = check_for_errors(log_events=log_events)
assert error_logs is not None
error_messages = None
if error_logs:
error_messages = "\n".join([f"{log['timestamp']}: {log['message']}" for log in
error_logs])
# Send SNS notification
subject = f"Errors found in AWS Batch job {batch_job_id} logs"
message = f"Errors detected in AWS Batch job {batch_job_id}:\n\n{error_messages}"
body = None
send_sns_notification(sns_topic_arn, subject, message)
print(f"Sent SNS notification for errors in job {batch_job_id}")
else:
print("No errors found in CW logs")
In my CFN template I have this for creating the lambda function.
TradeFileTestLambdaFunction:
Type: AWS::Serverless::Function
Properties:
Description: >
Handles error events from summa-trade-file-test events.
Handler: app.lambda_handler
Runtime: python3.11
FunctionName:
!Sub
- '${TheAppNameForResources}-${TheEnv}'
- TheEnv: !Ref Env
TheAppNameForResources: !Ref AppNameForResources
EphemeralStorage:
Size: 10240
Timeout: 900
Role: !GetAtt MyLambdaExecutionRole.Arn
Policies:
- 'AWSLambdaVPCAccessExecutionRole'
- 'AWSXRayDaemonWriteAccess'
- CloudWatchLogsReadOnlyAccess
- SNSPublishPolicy
Environment:
Variables:
ENV: !Ref Env
SES_IDENTITY_ARN: !Ref SesIdentityArn
SNS_TOPIC_ARN: !Ref SnsAlertTopic
LOG_GROUP_NAME: !Ref MyBatchLogGroupName
BATCH_JOB_ID: !Ref MyBatchJobId
EventInvokeConfig:
MaximumRetryAttempts: 0
Events:
LogMonitorEvent:
Type: Schedule
Properties:
Schedule: "rate(30 minutes)"
ReservedConcurrentExecutions: 10
SnsAlertTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName:
!Sub
- '${TheEnv}-${TheAppNameForResources}-alert-${TheRegion}'
- TheEnv: !Ref Env
TheAppNameForResources: !Ref AppNameForResources
TheRegion: !Ref AWS::Region
FifoTopic: false
Also it looks like I cannot use describe_jobs...I am getting this error:
{ "errorMessage": "'CloudWatchLogs' object has no attribute 'describe_jobs'", "errorType": "AttributeError", "requestId": "44bc882e-7463-46d0-95bc-1660461d2cbd", "stackTrace": [ " File "/var/task/app.py", line 131, in lambda_handler\n log_stream_name = get_log_stream_name(batch_job_id)\n", " File "/var/task/app.py", line 71, in get_log_stream_name\n response = logs_client.describe_jobs(jobs=[batch_job_id])\n", " File "/var/lang/lib/python3.11/site-packages/botocore/client.py", line 918, in getattr\n raise AttributeError(\n" ] }
Any help in resolving this much appreciated.
You need to use Batch client to query the last id of the batch execution and then use cloudwatch client to retrieve the log stream with logs_client.describe_log_streams :
import boto3
import re
batch_client = boto3.client('batch')
JOB_QUEUE = 'batch-job-queue'
def get_latest_batch_job_id():
try:
response = batch_client.list_jobs(
jobQueue=JOB_QUEUE,
jobStatus='SUCCEEDED', # You can change this as you need
maxResults=1
)
jobs = response.get('jobSummaryList', [])
if jobs:
latest_job = jobs[0] # The most recent job
batch_id = latest_job['jobId']
return batch_id
else:
print(f"No jobs found in {JOB_QUEUE}")
return None
except Exception as e:
print(f"Error retrieving latest batch job: {e}")
return None