Search code examples
amazon-web-servicesloggingaws-lambdaamazon-cloudwatch

Can you parse then query on the parse value in CloudWatch Insights?


My scenario is, I can search for an error message in CloudWatch, and I get all the results I want. But from that, I want to get the @requestId (only for results that match the error). From that @requestId, I want to return all the logs.

I have tried parsing the message, where the guid for the requestId exists like this:

parse "Z * Task timed out" as msgId 

Then filtering on

filter strcontains(@message, msgId) 

But this returns zero results. Likewise, I've also tried:

filter ispresent(msgId)

But this just returns any results that are not null, from the parse command.

Outside of adding in a dedup command and creating my own list to then create a separate search on, I can't seem to find a way to achieve this.

Can you do what I'm trying to do here? Or if not, what is your recommendation on the alternative?


Solution

  • While this isn't possible, it can be achieved by either using the SDK or AWS CLI via your language of choice.

    After getting that list above, I could iterate through each response and get the logstream, request id, timestamp (and using python save it in a df).

    Writing a method like this:

    import re
    import json
    import subprocess
    import pandas as pd
    
    # vars
    profile_name = <your aws cli profile name>
    log_group_name = <your log group name eg: /aws/lambda/your-lambda>
    
    def get_log_data_events(log_stream_name, start_time):
        log_output = subprocess.run(
        [
            'aws', 
            'logs', 
            'get-log-events', 
            '--log-group-name', log_group_name, 
            '--log-stream-name', log_stream_name, 
            '--start-time', start_time, 
            '--start-from-head',
            '--profile', profile_name
        ], capture_output=True, text=True)
        log_output_json = json.loads(log_output.stdout)
        
        return log_output_json['events']
    

    The return of ['events'] just gives you the json data of the event, instead of the extra data you may not need.

    Then using df['log_data'].apply like this

    df['log_data'] = df.apply(lambda row: get_log_data_events(row['@logStream'], str(row['@timestamp'].timestamp()).split('.')[0]), axis=1)
    

    **(note the split is to remove the .000 values from the default timestamp and convert to a nice epoch. (pre-req would be having this in your df as a Timestamp datatype))

    After that, I applied some search queries to find the specific data I needed:

    # vars
    search_in_message = 'Processing Message: {'
    
    def get_payload_event(log_data):
        payload_search = search_in_message
        for event in log_data:
            if payload_search in event['message']:
                # 
                json_str = re.search(r'{.*}', event['message']).group()
                print(json_str)
                return json.loads(json_str)
    

    This let me isolate the exact message I was looking for using another df.apply

    df['message'] = df['log_data'].apply(lambda log_data: get_payload_event(log_data))
    

    Then ultimately, I was able to grab those payloads and resubmit to an sqs queue. But that's outside the scope of my original question.