Search code examples
amazon-web-servicesaws-lambdaaws-cloudformation

How to query cloudwatch logs of AWS Batch using boto3


I am trying to implement a small AWS lambda function that will capture the cloudwatch logs created by my AWS batch program and look for any Errors in it and if so, send an SNS notification via email.

Somehow I am unable to get the latest batch_id since Batch ID keeps changing every time AWS Batch runs. I had this hardcoded in Environment variables of AWS lambda but it doesn't work since batchId is unique every time batch is executed.

This is my code so far.

import re
import os
import boto3
import urllib3
urllib3.disable_warnings()
logs_client = boto3.client('logs')
sns_client = boto3.client('sns')

def get_log_stream_name(batch_job_id):
    response = logs_client.describe_jobs(jobs=[batch_job_id])
if response['jobs']:
    container = response['jobs'][0].get('container', {})
    log_stream_name = container.get('logStreamName')
    return log_stream_name
else:
    print(f"No job found with ID {batch_job_id}")
    return None

def get_cloudwatch_logs(log_group_name, log_stream_name, start_time=None,     
                          end_time=None, limit=100):
    # Retrieve log events from the latest log stream
    kwargs = {
        'logGroupName': log_group_name,
        'logStreamName': log_stream_name,
        'limit': limit
    }
    if start_time:
        kwargs['startTime'] = int(start_time)
    if end_time:
        kwargs['endTime'] = int(end_time)

    response = logs_client.get_log_events(**kwargs)
    return response['events']

def check_for_errors(log_events, error_keywords=None):
    if error_keywords is None:
        error_keywords = ["ERROR", "Exception", "FAILED"]
    error_pattern = re.compile("|".join(error_keywords), re.IGNORECASE)
    error_logs = [event for event in log_events if     
        error_pattern.search(event['message'])]
    return error_logs

def send_sns_notification(topic_arn, subject, message):
    # Send a notification to SNS topic
    response = sns_client.publish(
    TopicArn=topic_arn,
    Subject=subject,
    Message=message
)
return response

def lambda_handler(event, context):
    # Get the latest log stream
    print("event:", event)
    print("context:", context)
    batch_job_id = os.getenv('BATCH_JOB_ID')
    print('batch_job_id:', batch_job_id)
    log_group_name = os.getenv('LOG_GROUP_NAME')
    print('log_group_name: ', log_group_name)
    sns_topic_arn = event.getenv('SNS_TOPIC_ARN')
    print('sns_topic_arn: ', sns_topic_arn)

    assert batch_job_id is not None
    assert log_group_name is not None
    assert sns_topic_arn is not None

    log_stream_name = get_log_stream_name(batch_job_id)
    assert log_stream_name is not None

    log_events = get_cloudwatch_logs(log_group_name=log_group_name,
                                 log_stream_name=log_stream_name)
    assert log_events is not None

    # Check for errors in logs
    error_logs = check_for_errors(log_events=log_events)
    assert error_logs is not None
    error_messages = None
    if error_logs:
        error_messages = "\n".join([f"{log['timestamp']}: {log['message']}" for log in 
     error_logs])
    # Send SNS notification
    subject = f"Errors found in AWS Batch job {batch_job_id} logs"
    message = f"Errors detected in AWS Batch job {batch_job_id}:\n\n{error_messages}"
    body = None
    send_sns_notification(sns_topic_arn, subject, message)
    print(f"Sent SNS notification for errors in job {batch_job_id}")
else:
    print("No errors found in CW logs")

In my CFN template I have this for creating the lambda function.

TradeFileTestLambdaFunction:
  Type: AWS::Serverless::Function
  Properties:
    Description: >
      Handles error events from summa-trade-file-test events.
    Handler: app.lambda_handler
    Runtime: python3.11
    FunctionName:
      !Sub
      - '${TheAppNameForResources}-${TheEnv}'
      - TheEnv: !Ref Env
        TheAppNameForResources: !Ref AppNameForResources
    EphemeralStorage:
      Size: 10240
    Timeout: 900
    Role: !GetAtt MyLambdaExecutionRole.Arn
    Policies:
      - 'AWSLambdaVPCAccessExecutionRole'
      - 'AWSXRayDaemonWriteAccess'
      - CloudWatchLogsReadOnlyAccess
      - SNSPublishPolicy
    Environment:
      Variables:
      ENV: !Ref Env
      SES_IDENTITY_ARN: !Ref SesIdentityArn
      SNS_TOPIC_ARN: !Ref SnsAlertTopic
      LOG_GROUP_NAME: !Ref MyBatchLogGroupName
      BATCH_JOB_ID: !Ref MyBatchJobId
    EventInvokeConfig:
      MaximumRetryAttempts: 0
    Events:
      LogMonitorEvent:
        Type: Schedule
        Properties:
          Schedule: "rate(30 minutes)"
    ReservedConcurrentExecutions: 10

SnsAlertTopic:
  Type: AWS::SNS::Topic
  Properties:
    DisplayName:
      !Sub
        - '${TheEnv}-${TheAppNameForResources}-alert-${TheRegion}'
        - TheEnv: !Ref Env
          TheAppNameForResources: !Ref AppNameForResources
          TheRegion: !Ref AWS::Region
    FifoTopic: false

Also it looks like I cannot use describe_jobs...I am getting this error:

{ "errorMessage": "'CloudWatchLogs' object has no attribute 'describe_jobs'", "errorType": "AttributeError", "requestId": "44bc882e-7463-46d0-95bc-1660461d2cbd", "stackTrace": [ " File "/var/task/app.py", line 131, in lambda_handler\n log_stream_name = get_log_stream_name(batch_job_id)\n", " File "/var/task/app.py", line 71, in get_log_stream_name\n response = logs_client.describe_jobs(jobs=[batch_job_id])\n", " File "/var/lang/lib/python3.11/site-packages/botocore/client.py", line 918, in getattr\n raise AttributeError(\n" ] }

Any help in resolving this much appreciated.


Solution

  • You need to use Batch client to query the last id of the batch execution and then use cloudwatch client to retrieve the log stream with logs_client.describe_log_streams :

    import boto3
    import re
    
    batch_client = boto3.client('batch')
    JOB_QUEUE = 'batch-job-queue'  
    
    def get_latest_batch_job_id():
        try:
            response = batch_client.list_jobs(
                jobQueue=JOB_QUEUE,
                jobStatus='SUCCEEDED', # You can change this as you need
                maxResults=1  
            )
            jobs = response.get('jobSummaryList', [])
            if jobs:
                latest_job = jobs[0]  # The most recent job
                batch_id = latest_job['jobId']  
                return batch_id
            else:
                print(f"No jobs found in {JOB_QUEUE}")
                return None
        except Exception as e:
            print(f"Error retrieving latest batch job: {e}")
            return None