Search code examples
pythonjsonamazon-sqs

parse sqs response with python


I have a python lambda that is subscribed to an sqs queue the queue messages are generated from a step function that is calling Athena. I am having trouble figuring out how to parse the body message to pull out the values I want in the lambda. If this is the wrong approach please let me know a better way to get the data out of the sqs message

I thought I would be able to parse out the values doing something like this

record  = test['ResultSet']

event_type = record['Data'][0]['VarCharValue']
region = record['Data'][2]['VarCharValue']
user_name = record['Data'][3]['VarCharValue']

but the body is actually just a big string, not JSON. This is the string I'm trying to parse

    '{
    ResultSet: {
        ResultSetMetadata: {
            ColumnInfo: [{
                CaseSensitive: true,
                CatalogName: hive,
                Label: eventname,
                Name: eventname,
                Nullable: UNKNOWN,
                Precision: 2147483647,
                Scale: 0,
                SchemaName: ,
                TableName: ,
                Type: varchar
            }, {
                CaseSensitive: true,
                CatalogName: hive,
                Label: eventsource,
                Name: eventsource,
                Nullable: UNKNOWN,
                Precision: 2147483647,
                Scale: 0,
                SchemaName: ,
                TableName: ,
                Type: varchar
            }, {
                CaseSensitive: true,
                CatalogName: hive,
                Label: awsregion,
                Name: awsregion,
                Nullable: UNKNOWN,
                Precision: 2147483647,
                Scale: 0,
                SchemaName: ,
                TableName: ,
                Type: varchar
            }, {
                CaseSensitive: true,
                CatalogName: hive,
                Label: useridentity.principalid,
                Name: useridentity.principalid,
                Nullable: UNKNOWN,
                Precision: 2147483647,
                Scale: 0,
                SchemaName: ,
                TableName: ,
                Type: varchar
            }]
        },
        Rows: [{
            Data: [{
                VarCharValue: eventname
            }, {
                VarCharValue: eventsource
            }, {
                VarCharValue: awsregion
            }, {
                VarCharValue: useridentity.principalid
            }]
        }, {
            Data: [{
                VarCharValue: DeleteBucket
            }, {
                VarCharValue: s3.amazonaws.com
            }, {
                VarCharValue: us - west - 2
            }, {
                VarCharValue: fsdfsdf: test.user @user.net
            }]
        }, {
            Data: [{
                VarCharValue: StartExecution
            }, {
                VarCharValue: states.amazonaws.com
            }, {
                VarCharValue: us - west - 2
            }, {
                VarCharValue: sdfsdfs: test.user @user.net
            }]
        }, {
            Data: [{
                VarCharValue: BatchDeleteTable
            }, {
                VarCharValue: glue.amazonaws.com
            }, {
                VarCharValue: us - west - 2
            }, {
                VarCharValue: zxcdsfdf: test.user @user.net
            }]
        }]
    },
    UpdateCount: 0
 }
}

How do I get the values out of each of the "Data" elements?

UPDATE: This is the test script I am using

import json    

eventJson = json.dumps({'Records': [{'messageId': '1ca8da82-e973-4c80-9a62-d8515fe8e436', 'receiptHandle': 'AQEBFsdkwGImOI4UIS0Nmeza+cuYGVRkAanXCHGBzlu6g7xs7nbmQ4O9GIQv2aXILmSH4Am5R1pjRIW/OrG6D8u2F76woZnyJsSBALheJ+i/LA+dxjNZB7vzXbEeX24phvPuXx0bzEDTGXRThkrSfpbpcSUTVQIqYEh39cboahWZX8YI/M22QQ3NQp3TAXiDB21FqyTrN8F9QdZdK6zDR0AVWPm86hWEzLkQ2FuTBCv9/voo2oXy7c9vgo6ByURgGEYS1LYfaR3AuYlR730ZvM/LZ8i+7wOQ41Hvk/QnsLj1WtiY2UxZp1nxmQbrEgyYjznsV813liL2lzj7CaHTZNjQforRft6vaBxhKyr9vV7ve6OTgbEtCUOFS576Z7cSHsflHCacp7uXMExlZW1ql9X7OVdTByw8pjBQ+SwdCyk1JOs=', 'body': '{ResultSet:{ResultSetMetadata:{ColumnInfo:[{CaseSensitive:true,CatalogName:hive,Label:eventname,Name:eventname,Nullable:UNKNOWN,Precision:2147483647,Scale:0,SchemaName:,TableName:,Type:varchar},{CaseSensitive:true,CatalogName:hive,Label:eventsource,Name:eventsource,Nullable:UNKNOWN,Precision:2147483647,Scale:0,SchemaName:,TableName:,Type:varchar},{CaseSensitive:true,CatalogName:hive,Label:awsregion,Name:awsregion,Nullable:UNKNOWN,Precision:2147483647,Scale:0,SchemaName:,TableName:,Type:varchar},{CaseSensitive:true,CatalogName:hive,Label:useridentity.principalid,Name:useridentity.principalid,Nullable:UNKNOWN,Precision:2147483647,Scale:0,SchemaName:,TableName:,Type:varchar}]},Rows:[{Data:[{VarCharValue:eventname},{VarCharValue:eventsource},{VarCharValue:awsregion},{VarCharValue:useridentity.principalid}]},{Data:[{VarCharValue:DeleteBucket},{VarCharValue:s3.amazonaws.com},{VarCharValue:us-west-2},{VarCharValue:KMJ:test.user@user.net}]},{Data:[{VarCharValue:StartExecution},{VarCharValue:states.amazonaws.com},{VarCharValue:us-west-2},{VarCharValue:KMJ:test.user@user.net}]},{Data:[{VarCharValue:BatchDeleteTable},{VarCharValue:glue.amazonaws.com},{VarCharValue:us-west-2},{VarCharValue:KMJ:test.user@user.net}]}]},UpdateCount:0}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1630093682422', 'SenderId': 'LH42ZL:YgNIlUPDBrFrujfZswcgbByYJJVVytEG', 'ApproximateFirstReceiveTimestamp': '1630093687422'}, 'messageAttributes': {}, 'md5OfBody': 'f03d97e71734dd97abd9dfd6666b3549', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:2347942:drift-detect-que-standard', 'awsRegion': 'us-west-2'}]})

for record in eventJson["Records"]:
    test = record['body']
    test = json.loads(json.dumps(test))
    print(test)
    record  = test['ResultSet']
    print(jsonstr)
    event_type = record['Data'][0]['VarCharValue']
    region = record['Data'][2]['VarCharValue']
    user_name = record['Data'][3]['VarCharValue']

UPDATE2 as Neil pointed out there was an issue with Athena after correcting it the new format that I am trying to parse is

    {
    "ResultSet": {
        "ResultSetMetadata": {
            "ColumnInfo": [
                {
                    "CaseSensitive": false,
                    "CatalogName": "hive",
                    "Label": "eventname",
                    "Name": "eventname",
                    "Nullable": "UNKNOWN",
                    "Precision": 0,
                    "Scale": 0,
                    "SchemaName": "",
                    "TableName": "",
                    "Type": "json"
                },
                {
                    "CaseSensitive": false,
                    "CatalogName": "hive",
                    "Label": "eventsource",
                    "Name": "eventsource",
                    "Nullable": "UNKNOWN",
                    "Precision": 0,
                    "Scale": 0,
                    "SchemaName": "",
                    "TableName": "",
                    "Type": "json"
                },
                {
                    "CaseSensitive": false,
                    "CatalogName": "hive",
                    "Label": "awsregion",
                    "Name": "awsregion",
                    "Nullable": "UNKNOWN",
                    "Precision": 0,
                    "Scale": 0,
                    "SchemaName": "",
                    "TableName": "",
                    "Type": "json"
                },
                {
                    "CaseSensitive": false,
                    "CatalogName": "hive",
                    "Label": "username",
                    "Name": "username",
                    "Nullable": "UNKNOWN",
                    "Precision": 0,
                    "Scale": 0,
                    "SchemaName": "",
                    "TableName": "",
                    "Type": "json"
                }
            ]
        },
        "Rows": [
            {
                "Data": [
                    {
                        "VarCharValue": "eventname"
                    },
                    {
                        "VarCharValue": "eventsource"
                    },
                    {
                        "VarCharValue": "awsregion"
                    },
                    {
                        "VarCharValue": "username"
                    }
                ]
            },
            {
                "Data": [
                    {
                        "VarCharValue": "\"DeleteBucket\""
                    },
                    {
                        "VarCharValue": "\"s3.amazonaws.com\""
                    },
                    {
                        "VarCharValue": "\"us-west-2\""
                    },
                    {
                        "VarCharValue": "\"AROA4SALNAMTBCVMSUKMJ:travis.jorge@tylerhost.net\""
                    }
                ]
            },
            {
                "Data": [
                    {
                        "VarCharValue": "\"StartExecution\""
                    },
                    {
                        "VarCharValue": "\"states.amazonaws.com\""
                    },
                    {
                        "VarCharValue": "\"us-west-2\""
                    },
                    {
                        "VarCharValue": "\"AROA4SALNAMTBCVMSUKMJ:travis.jorge@tylerhost.net\""
                    }
                ]
            },
            {
                "Data": [
                    {
                        "VarCharValue": "\"BatchDeleteTable\""
                    },
                    {
                        "VarCharValue": "\"glue.amazonaws.com\""
                    },
                    {
                        "VarCharValue": "\"us-west-2\""
                    },
                    {
                        "VarCharValue": "\"AROA4SALNAMTBCVMSUKMJ:travis.jorge@tylerhost.net\""
                    }
                ]
            }
        ]
    },
    "UpdateCount": 0
}

Solution

  • Thanks to Neil who pointed out that my Athena query might have an issue which it did. I did not realize I had to convert the output to JSON. so after adjusting the query to be like this:

    SELECT CAST(eventname AS JSON) AS eventname, CAST(eventsource AS JSON) as eventsource, CAST(awsregion AS JSON) as awsregion, CAST("useridentity.principalid" AS JSON) as username FROM "cloudtrail-drift-detect"."cloudtrail_parquet"
    

    Which now produced the correct message format:

    {
        'Records': [{
            'messageId': '34104fd9-feb5-4c47-b8dc-2e209c95d4ae',
            'receiptHandle': 'AQEBQSLA2j/KE9nj/u7zEibz0uwYaM7qYv9NbwVMmaL74q4zm8V3sQr3OIzwFprC8KvSPXjB7zEibz0uwYaM7qYv9NbwVMmaL74qdhAWqEWkSY2VIujXF2LZ0lTrQVIf40R1M27zEibz0uwYaM7qYv9NbwVMmaL74qdxEIAmV0gQcaGW/nNbtBAujMDlBy0/y3l5pcgYFrFGM7faG5oLodU82H0Fu8E5KwMWSW2NaKarFce7+dqjeU2xDjIShpdafrRBX3dBAoapCn40Ly9kLO7nXBixH1/KhQIAoVXSXLsq1Q4vtuLmyJKaEI0KKm53IaztNrBtH7rAQHhwJVuPcs8yT+M0e235W6FCzr+APeJayoDJwywSZqMZdiQKlgaBexSgIZ60Kx/q+OBondYt4KesOpbq0yvcCAZqOMQHvRtaPSRgo+jit3O7TmMIqrwudBpU=',
            'body': '{"ResultSet":{"ResultSetMetadata":{"ColumnInfo":[{"CaseSensitive":false,"CatalogName":"hive","Label":"eventname","Name":"eventname","Nullable":"UNKNOWN","Precision":0,"Scale":0,"SchemaName":"","TableName":"","Type":"json"},{"CaseSensitive":false,"CatalogName":"hive","Label":"eventsource","Name":"eventsource","Nullable":"UNKNOWN","Precision":0,"Scale":0,"SchemaName":"","TableName":"","Type":"json"},{"CaseSensitive":false,"CatalogName":"hive","Label":"awsregion","Name":"awsregion","Nullable":"UNKNOWN","Precision":0,"Scale":0,"SchemaName":"","TableName":"","Type":"json"},{"CaseSensitive":false,"CatalogName":"hive","Label":"username","Name":"username","Nullable":"UNKNOWN","Precision":0,"Scale":0,"SchemaName":"","TableName":"","Type":"json"}]},"Rows":[{"Data":[{"VarCharValue":"eventname"},{"VarCharValue":"eventsource"},{"VarCharValue":"awsregion"},{"VarCharValue":"username"}]},{"Data":[{"VarCharValue":"\\"StartExecution\\""},{"VarCharValue":"\\"states.amazonaws.com\\""},{"VarCharValue":"\\"us-west-2\\""},{"VarCharValue":"\\"AROMTBCVMSUKMJ:test.user@user.net\\""}]},{"Data":[{"VarCharValue":"\\"BatchDeleteTable\\""},{"VarCharValue":"\\"glue.amazonaws.com\\""},{"VarCharValue":"\\"us-west-2\\""},{"VarCharValue":"\\"AROMTBCVMSUKMJ:test.user@user.net\\""}]},{"Data":[{"VarCharValue":"\\"DeleteBucket\\""},{"VarCharValue":"\\"s3.amazonaws.com\\""},{"VarCharValue":"\\"us-west-2\\""},{"VarCharValue":"\\"AROMTBCVMSUKMJ:test.user@user.net\\""}]}]},"UpdateCount":0}',
            'attributes': {
                'ApproximateReceiveCount': '1',
                'SentTimestamp': '1630127317930',
                'SenderId': 'AROA4SALSADFSDF42ZL:ASCBDOBYvYcFVSDFSDFSDQisLmnUrifYzXcr',
                'ApproximateFirstReceiveTimestamp': '1630127322930'
            },
            'messageAttributes': {},
            'md5OfBody': 'e723ece7243c4c6c0ff79c078d591db9',
            'eventSource': 'aws:sqs',
            'eventSourceARN': 'arn:aws:sqs:us-west-2:545896547:sqs-que-standard',
            'awsRegion': 'us-west-2'
        }]
    }
    

    I was then able to process the SQS message just like any other JSON message in Python:

    for results in event["Records"]:
        results = json.loads(json.dumps(results))
        results_body = json.loads(results["body"])
        results_rows = results_body["ResultSet"]["Rows"]
        for record in results_rows:
            event_type = record['Data'][0]['VarCharValue']
            region = record['Data'][2]['VarCharValue']
            user_name = record['Data'][3]['VarCharValue']