I have created a Firehose that uses dynamic partitioning with inline parsing. It is Direct PUT because I am using a Lambda to get data from an event source that doesn't directly connect with Firehose, and then on the Lambda I am doing put_record_batch
calls to send the data to firehose. The JSON objects that I am sending to Firehose are like so
{
"customer_id": "123123123",
...
}
according to the documentation here, when configuring Firehose, I should be able to just specify the following to be able to partition based on customer_id
customer_id
and a jq expression of .customer_id
then if i send through a put_request_batch
request like the following:
payload1 = {"customer_id": "123123"}
encoded_data1 = base64.b64encode(json.dumps(payload1).encode('UTF-8'))
payload2 = {"customer_id": "323232"}
encoded_data2 = base64.b64encode(json.dumps(payload2).encode('UTF-8'))
response = client.put_record_batch(
DeliveryStreamName='string',
Records=[
{
'Data': encoded_data1
},
{
'Data': encoded_data2
},
]
)
I should be able to have the data partitioned based on "customer_id" when it is sent to s3. Instead of that though, I get the following error:
{
"deliveryStreamARN": "<arn>",
"destination": "<s3 bucket>",
"deliveryStreamVersionId": 4,
"message": "Metadata extraction failed because of invalid record.",
"errorCode": "DynamicPartitioning.MetadataExtractionFailed"
}
and the error record that gets stored in s3 says
"errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided"
What am I doing wrong? Clearly I sent JSON to firehose.
The Data you put inside the 'Data'
key should not be base64 encoded. Just do this:
response = client.put_record_batch(
DeliveryStreamName=stream_name,
Records=[
{
'Data': json.dumps(payload1)
},
{
'Data': json.dumps(payload2)
},
]
)
AWS documentation is saying that the Data blob will be base64 encoded once serialized.