Search code examples
amazon-web-servicesaws-lambdaparquetamazon-kinesis-firehose

Error creating a Firehose Datastream through Lambda with Dynamic Partitioning


I'm trying to create Kinesis Firehose streams with Dynamic Partitioning through a Lambda function. The function creates a stream if I remove the DynamicPartitioningConfiguration and ProcessingConfiguration elements, and change the prefix to a Dynamic Partitioning friendly string. The issue is that if those elements are in, I get an error of

kinesis error UnexpectedParameter: Unexpected key 'DynamicPartitioningConfiguration' found in params

Here is the function creating the streams, the removed ARNs and Glue info all work fine with the Dynamic Partioning stuff removed.

const AWS = require("aws-sdk");
var firehose = new AWS.Firehose();

exports.handler = async (event) => {
    
    let params = {
      DeliveryStreamName: event.body.shop+"-data-stream",
      DeliveryStreamType: "DirectPut",
      ExtendedS3DestinationConfiguration: {
        BucketARN: 'arn:aws:s3:::'+event.body.shop,
        RoleARN: 'aValidARN',
        Prefix: '!{partitionKeyFromQuery:obj-type}/',
        ErrorOutputPrefix: 'errors/',
        DataFormatConversionConfiguration: {
            Enabled: true,
            InputFormatConfiguration: {
              Deserializer: {
                  OpenXJsonSerDe: {
                    CaseInsensitive: false,
                    ConvertDotsInJsonKeysToUnderscores: false
                  }
              }
            },  
            OutputFormatConfiguration: {
                Serializer: {
                  ParquetSerDe: {
                    Compression: "SNAPPY",
                    WriterVersion: "V1"
                  }                    
                }
            },
            SchemaConfiguration: {
                RoleARN: "aValidARN",
                DatabaseName: "aValidDBName",
                TableName: "aValidTableName"
            }
        },
        DynamicPartitioningConfiguration: {
            Enabled: "true"
        },  
        ProcessingConfiguration: {
          Enabled: true,
          Processors: [
            {
              Type: "MetadataExtraction",
              Parameters: [
                {
                  ParameterName: "MetadataExtractionQuery",
                  ParameterValue: 'type:.type'
                },
                {
                  ParameterName: "JsonParsingEngine",
                  ParameterValue: 'JQ-1.6'
                }
              ]
            }
          ]
        },         
      }, 
      Tags: [
        {
          Key: 'billing-'+event.body.shop,
          Value: 'true'
        }
      ]
    };
    
    try{
        await firehose.createDeliveryStream(params).promise();    
    }catch(e){
        console.log("kinesis error "+e);
        return {
            statusCode: 500,
            error: JSON.stringify(e)
        };          
    }

    return {
        statusCode: 200,
        body: JSON.stringify("Stream created"),
    };
};

The docs for this are of very little help. I'm using the JS SDK reference and the element is exactly where it should be, nested within the ExtendedS3DestinationConfiguration element. I'm thinking that there is some conflict of settings that isn't documented and that's why the unexpected key return, but I'm new to firehose and lacking the knowledge to troubleshoot this. I've created the ProcessingConfiguration Element based on the answer at https://stackoverflow.com/a/69047987/2755996 and the cloudformation docs at https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-extendeds3destinationconfiguration.html#cfn-kinesisfirehose-deliverystream-extendeds3destinationconfiguration-dynamicpartitioningconfiguration .

Any help would be much appreciated


Solution

  • Check the version of AWS JS SDK. Firehose dynamic partitioning feature was launched in v2.979.0. Use JS SDK v2.979.0 or above.

    Faced similar issue before with python SDK. Upgrading the version solved this issue