Search code examples
amazon-web-servicesamazon-kinesis-firehose

AWS Firehose Dynamic Partitioning Inline Parsing with Direct PUT


I have created a Firehose that uses dynamic partitioning with inline parsing. It is Direct PUT because I am using a Lambda to get data from an event source that doesn't directly connect with Firehose, and then on the Lambda I am doing put_record_batch calls to send the data to firehose. The JSON objects that I am sending to Firehose are like so

{
  "customer_id": "123123123",
  ...
}

according to the documentation here, when configuring Firehose, I should be able to just specify the following to be able to partition based on customer_id

  1. Set Dynamic Partitioning to enabled
  2. Set Multi-Record DeAggregation to enabled
  3. Set Inline Parsing for JSON to enabled
  4. Create a dynamic partition keyname of customer_id and a jq expression of .customer_id

then if i send through a put_request_batch request like the following:

payload1 = {"customer_id": "123123"}
encoded_data1 = base64.b64encode(json.dumps(payload1).encode('UTF-8'))

payload2 = {"customer_id": "323232"}
encoded_data2 = base64.b64encode(json.dumps(payload2).encode('UTF-8'))

response = client.put_record_batch(
    DeliveryStreamName='string',
    Records=[
        {
            'Data': encoded_data1
        },
        {
            'Data': encoded_data2
        },
    ]
)

I should be able to have the data partitioned based on "customer_id" when it is sent to s3. Instead of that though, I get the following error:

{
    "deliveryStreamARN": "<arn>",
    "destination": "<s3 bucket>",
    "deliveryStreamVersionId": 4,
    "message": "Metadata extraction failed because of invalid record.",
    "errorCode": "DynamicPartitioning.MetadataExtractionFailed"
}

and the error record that gets stored in s3 says

"errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided"

What am I doing wrong? Clearly I sent JSON to firehose.


Solution

  • The Data you put inside the 'Data'key should not be base64 encoded. Just do this:

    response = client.put_record_batch(
        DeliveryStreamName=stream_name,
        Records=[
            {
                'Data': json.dumps(payload1)
            },
            {
                'Data': json.dumps(payload2)
            },
        ]
    )
    

    AWS documentation is saying that the Data blob will be base64 encoded once serialized.