I have a python lambda that is subscribed to an sqs queue the queue messages are generated from a step function that is calling Athena. I am having trouble figuring out how to parse the body message to pull out the values I want in the lambda. If this is the wrong approach please let me know a better way to get the data out of the sqs message
I thought I would be able to parse out the values doing something like this
record = test['ResultSet']
event_type = record['Data'][0]['VarCharValue']
region = record['Data'][2]['VarCharValue']
user_name = record['Data'][3]['VarCharValue']
but the body is actually just a big string, not JSON. This is the string I'm trying to parse
'{
ResultSet: {
ResultSetMetadata: {
ColumnInfo: [{
CaseSensitive: true,
CatalogName: hive,
Label: eventname,
Name: eventname,
Nullable: UNKNOWN,
Precision: 2147483647,
Scale: 0,
SchemaName: ,
TableName: ,
Type: varchar
}, {
CaseSensitive: true,
CatalogName: hive,
Label: eventsource,
Name: eventsource,
Nullable: UNKNOWN,
Precision: 2147483647,
Scale: 0,
SchemaName: ,
TableName: ,
Type: varchar
}, {
CaseSensitive: true,
CatalogName: hive,
Label: awsregion,
Name: awsregion,
Nullable: UNKNOWN,
Precision: 2147483647,
Scale: 0,
SchemaName: ,
TableName: ,
Type: varchar
}, {
CaseSensitive: true,
CatalogName: hive,
Label: useridentity.principalid,
Name: useridentity.principalid,
Nullable: UNKNOWN,
Precision: 2147483647,
Scale: 0,
SchemaName: ,
TableName: ,
Type: varchar
}]
},
Rows: [{
Data: [{
VarCharValue: eventname
}, {
VarCharValue: eventsource
}, {
VarCharValue: awsregion
}, {
VarCharValue: useridentity.principalid
}]
}, {
Data: [{
VarCharValue: DeleteBucket
}, {
VarCharValue: s3.amazonaws.com
}, {
VarCharValue: us - west - 2
}, {
VarCharValue: fsdfsdf: test.user @user.net
}]
}, {
Data: [{
VarCharValue: StartExecution
}, {
VarCharValue: states.amazonaws.com
}, {
VarCharValue: us - west - 2
}, {
VarCharValue: sdfsdfs: test.user @user.net
}]
}, {
Data: [{
VarCharValue: BatchDeleteTable
}, {
VarCharValue: glue.amazonaws.com
}, {
VarCharValue: us - west - 2
}, {
VarCharValue: zxcdsfdf: test.user @user.net
}]
}]
},
UpdateCount: 0
}
}
How do I get the values out of each of the "Data" elements?
UPDATE: This is the test script I am using
import json
eventJson = json.dumps({'Records': [{'messageId': '1ca8da82-e973-4c80-9a62-d8515fe8e436', 'receiptHandle': 'AQEBFsdkwGImOI4UIS0Nmeza+cuYGVRkAanXCHGBzlu6g7xs7nbmQ4O9GIQv2aXILmSH4Am5R1pjRIW/OrG6D8u2F76woZnyJsSBALheJ+i/LA+dxjNZB7vzXbEeX24phvPuXx0bzEDTGXRThkrSfpbpcSUTVQIqYEh39cboahWZX8YI/M22QQ3NQp3TAXiDB21FqyTrN8F9QdZdK6zDR0AVWPm86hWEzLkQ2FuTBCv9/voo2oXy7c9vgo6ByURgGEYS1LYfaR3AuYlR730ZvM/LZ8i+7wOQ41Hvk/QnsLj1WtiY2UxZp1nxmQbrEgyYjznsV813liL2lzj7CaHTZNjQforRft6vaBxhKyr9vV7ve6OTgbEtCUOFS576Z7cSHsflHCacp7uXMExlZW1ql9X7OVdTByw8pjBQ+SwdCyk1JOs=', 'body': '{ResultSet:{ResultSetMetadata:{ColumnInfo:[{CaseSensitive:true,CatalogName:hive,Label:eventname,Name:eventname,Nullable:UNKNOWN,Precision:2147483647,Scale:0,SchemaName:,TableName:,Type:varchar},{CaseSensitive:true,CatalogName:hive,Label:eventsource,Name:eventsource,Nullable:UNKNOWN,Precision:2147483647,Scale:0,SchemaName:,TableName:,Type:varchar},{CaseSensitive:true,CatalogName:hive,Label:awsregion,Name:awsregion,Nullable:UNKNOWN,Precision:2147483647,Scale:0,SchemaName:,TableName:,Type:varchar},{CaseSensitive:true,CatalogName:hive,Label:useridentity.principalid,Name:useridentity.principalid,Nullable:UNKNOWN,Precision:2147483647,Scale:0,SchemaName:,TableName:,Type:varchar}]},Rows:[{Data:[{VarCharValue:eventname},{VarCharValue:eventsource},{VarCharValue:awsregion},{VarCharValue:useridentity.principalid}]},{Data:[{VarCharValue:DeleteBucket},{VarCharValue:s3.amazonaws.com},{VarCharValue:us-west-2},{VarCharValue:KMJ:test.user@user.net}]},{Data:[{VarCharValue:StartExecution},{VarCharValue:states.amazonaws.com},{VarCharValue:us-west-2},{VarCharValue:KMJ:test.user@user.net}]},{Data:[{VarCharValue:BatchDeleteTable},{VarCharValue:glue.amazonaws.com},{VarCharValue:us-west-2},{VarCharValue:KMJ:test.user@user.net}]}]},UpdateCount:0}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1630093682422', 'SenderId': 'LH42ZL:YgNIlUPDBrFrujfZswcgbByYJJVVytEG', 'ApproximateFirstReceiveTimestamp': '1630093687422'}, 'messageAttributes': {}, 'md5OfBody': 'f03d97e71734dd97abd9dfd6666b3549', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:2347942:drift-detect-que-standard', 'awsRegion': 'us-west-2'}]})
for record in eventJson["Records"]:
test = record['body']
test = json.loads(json.dumps(test))
print(test)
record = test['ResultSet']
print(jsonstr)
event_type = record['Data'][0]['VarCharValue']
region = record['Data'][2]['VarCharValue']
user_name = record['Data'][3]['VarCharValue']
UPDATE2 as Neil pointed out there was an issue with Athena after correcting it the new format that I am trying to parse is
{
"ResultSet": {
"ResultSetMetadata": {
"ColumnInfo": [
{
"CaseSensitive": false,
"CatalogName": "hive",
"Label": "eventname",
"Name": "eventname",
"Nullable": "UNKNOWN",
"Precision": 0,
"Scale": 0,
"SchemaName": "",
"TableName": "",
"Type": "json"
},
{
"CaseSensitive": false,
"CatalogName": "hive",
"Label": "eventsource",
"Name": "eventsource",
"Nullable": "UNKNOWN",
"Precision": 0,
"Scale": 0,
"SchemaName": "",
"TableName": "",
"Type": "json"
},
{
"CaseSensitive": false,
"CatalogName": "hive",
"Label": "awsregion",
"Name": "awsregion",
"Nullable": "UNKNOWN",
"Precision": 0,
"Scale": 0,
"SchemaName": "",
"TableName": "",
"Type": "json"
},
{
"CaseSensitive": false,
"CatalogName": "hive",
"Label": "username",
"Name": "username",
"Nullable": "UNKNOWN",
"Precision": 0,
"Scale": 0,
"SchemaName": "",
"TableName": "",
"Type": "json"
}
]
},
"Rows": [
{
"Data": [
{
"VarCharValue": "eventname"
},
{
"VarCharValue": "eventsource"
},
{
"VarCharValue": "awsregion"
},
{
"VarCharValue": "username"
}
]
},
{
"Data": [
{
"VarCharValue": "\"DeleteBucket\""
},
{
"VarCharValue": "\"s3.amazonaws.com\""
},
{
"VarCharValue": "\"us-west-2\""
},
{
"VarCharValue": "\"AROA4SALNAMTBCVMSUKMJ:travis.jorge@tylerhost.net\""
}
]
},
{
"Data": [
{
"VarCharValue": "\"StartExecution\""
},
{
"VarCharValue": "\"states.amazonaws.com\""
},
{
"VarCharValue": "\"us-west-2\""
},
{
"VarCharValue": "\"AROA4SALNAMTBCVMSUKMJ:travis.jorge@tylerhost.net\""
}
]
},
{
"Data": [
{
"VarCharValue": "\"BatchDeleteTable\""
},
{
"VarCharValue": "\"glue.amazonaws.com\""
},
{
"VarCharValue": "\"us-west-2\""
},
{
"VarCharValue": "\"AROA4SALNAMTBCVMSUKMJ:travis.jorge@tylerhost.net\""
}
]
}
]
},
"UpdateCount": 0
}
Thanks to Neil who pointed out that my Athena query might have an issue which it did. I did not realize I had to convert the output to JSON. so after adjusting the query to be like this:
SELECT CAST(eventname AS JSON) AS eventname, CAST(eventsource AS JSON) as eventsource, CAST(awsregion AS JSON) as awsregion, CAST("useridentity.principalid" AS JSON) as username FROM "cloudtrail-drift-detect"."cloudtrail_parquet"
Which now produced the correct message format:
{
'Records': [{
'messageId': '34104fd9-feb5-4c47-b8dc-2e209c95d4ae',
'receiptHandle': 'AQEBQSLA2j/KE9nj/u7zEibz0uwYaM7qYv9NbwVMmaL74q4zm8V3sQr3OIzwFprC8KvSPXjB7zEibz0uwYaM7qYv9NbwVMmaL74qdhAWqEWkSY2VIujXF2LZ0lTrQVIf40R1M27zEibz0uwYaM7qYv9NbwVMmaL74qdxEIAmV0gQcaGW/nNbtBAujMDlBy0/y3l5pcgYFrFGM7faG5oLodU82H0Fu8E5KwMWSW2NaKarFce7+dqjeU2xDjIShpdafrRBX3dBAoapCn40Ly9kLO7nXBixH1/KhQIAoVXSXLsq1Q4vtuLmyJKaEI0KKm53IaztNrBtH7rAQHhwJVuPcs8yT+M0e235W6FCzr+APeJayoDJwywSZqMZdiQKlgaBexSgIZ60Kx/q+OBondYt4KesOpbq0yvcCAZqOMQHvRtaPSRgo+jit3O7TmMIqrwudBpU=',
'body': '{"ResultSet":{"ResultSetMetadata":{"ColumnInfo":[{"CaseSensitive":false,"CatalogName":"hive","Label":"eventname","Name":"eventname","Nullable":"UNKNOWN","Precision":0,"Scale":0,"SchemaName":"","TableName":"","Type":"json"},{"CaseSensitive":false,"CatalogName":"hive","Label":"eventsource","Name":"eventsource","Nullable":"UNKNOWN","Precision":0,"Scale":0,"SchemaName":"","TableName":"","Type":"json"},{"CaseSensitive":false,"CatalogName":"hive","Label":"awsregion","Name":"awsregion","Nullable":"UNKNOWN","Precision":0,"Scale":0,"SchemaName":"","TableName":"","Type":"json"},{"CaseSensitive":false,"CatalogName":"hive","Label":"username","Name":"username","Nullable":"UNKNOWN","Precision":0,"Scale":0,"SchemaName":"","TableName":"","Type":"json"}]},"Rows":[{"Data":[{"VarCharValue":"eventname"},{"VarCharValue":"eventsource"},{"VarCharValue":"awsregion"},{"VarCharValue":"username"}]},{"Data":[{"VarCharValue":"\\"StartExecution\\""},{"VarCharValue":"\\"states.amazonaws.com\\""},{"VarCharValue":"\\"us-west-2\\""},{"VarCharValue":"\\"AROMTBCVMSUKMJ:test.user@user.net\\""}]},{"Data":[{"VarCharValue":"\\"BatchDeleteTable\\""},{"VarCharValue":"\\"glue.amazonaws.com\\""},{"VarCharValue":"\\"us-west-2\\""},{"VarCharValue":"\\"AROMTBCVMSUKMJ:test.user@user.net\\""}]},{"Data":[{"VarCharValue":"\\"DeleteBucket\\""},{"VarCharValue":"\\"s3.amazonaws.com\\""},{"VarCharValue":"\\"us-west-2\\""},{"VarCharValue":"\\"AROMTBCVMSUKMJ:test.user@user.net\\""}]}]},"UpdateCount":0}',
'attributes': {
'ApproximateReceiveCount': '1',
'SentTimestamp': '1630127317930',
'SenderId': 'AROA4SALSADFSDF42ZL:ASCBDOBYvYcFVSDFSDFSDQisLmnUrifYzXcr',
'ApproximateFirstReceiveTimestamp': '1630127322930'
},
'messageAttributes': {},
'md5OfBody': 'e723ece7243c4c6c0ff79c078d591db9',
'eventSource': 'aws:sqs',
'eventSourceARN': 'arn:aws:sqs:us-west-2:545896547:sqs-que-standard',
'awsRegion': 'us-west-2'
}]
}
I was then able to process the SQS message just like any other JSON message in Python:
for results in event["Records"]:
results = json.loads(json.dumps(results))
results_body = json.loads(results["body"])
results_rows = results_body["ResultSet"]["Rows"]
for record in results_rows:
event_type = record['Data'][0]['VarCharValue']
region = record['Data'][2]['VarCharValue']
user_name = record['Data'][3]['VarCharValue']