I have a use case in which I have to verify that the payloads sent to Kinesis firehose are indeed being sent.
In order to do that I came up with the chain Firehose -> Firehose Data transformation(using lambda) -> DDB -> Check for payload in DDB (the payload is the hashkey in the DDB). I have to define this entire chain in one shot programatically. The data transformation is the same as http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html.
I am doing all this since I cannot completely control the file name in the S3 bucket it goes to. So I need the exact payload being sent into some persistent key value store.
The problem is
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/CreateDeliveryStreamRequest.html does not seem to support adding a data transformation lambda.
My question is, is this doable without touching the console even once (completely through the AWS Kinesis Firehose API).
Or does have any alternate suggestions to move the data to DDB somehow.
Well so I figured it out after much effort and documentation scrounging.
You would have to define a processing configuration with a lambda ARN to define a data transform.
Here is what such a configuration would look like in code.
final ProcessingConfiguration processingConfiguration =
new ProcessingConfiguration().withEnabled(true)
.withProcessors(newProcessor().withType(ProcessorType.Lambda)
.withParameters(new ProcessorParameter().withParameterName(LambdaArn)
.withParameterValue(lamdbaFunctionArn)));
final CreateDeliveryStreamResult describeDeliveryStreamResult =
client.createDeliveryStream(new CreateDeliveryStreamRequest().withExtendedS3DestinationConfiguration(
new ExtendedS3DestinationConfiguration()
.withBucketARN(s3BucketARN)
.withRoleARN(roleArn)
.withPrefix(keyPrefix)
.withProcessingConfiguration(processingConfiguration))
.withDeliveryStreamName(streamName));
Here the ARNs are resource names for various objects (S3 destination, data transform lambda, IAM role etc).