Search code examples
pythonamazon-kinesisamazon-kinesis-firehoseamazon-kinesis-analyticsamazon-kinesis-video-streams

Kinesis Firehose Lambda Transformation and Dynamic partition


The following data presented is from the faker library. i am trying to learn and implement dynamic partition in kinesis Firehose

Sample payload Input

{
   "name":"Dr. Nancy Mcmillan",
   "phone_numbers":"8XXXXX",
   "city":"Priscillaport",
   "address":"908 Mitchell Views SXXXXXXXX 42564",
   "date":"1980-07-11",
   "customer_id":"3"
}

Sample Input code

def main():
    
    import boto3
    import json
    
    AWS_ACCESS_KEY = "XXXXX"
    AWS_SECRET_KEY = "XXX"
    AWS_REGION_NAME = "us-east-1"

    for i in range(1,13):
        faker = Faker()
        json_data = {
            "name": faker.name(),
            "phone_numbers": faker.phone_number(),
            "city": faker.city(),
            "address": faker.address(),
            "date": str(faker.date()),
            "customer_id": str(random.randint(1, 5))
        }
        print(json_data)
        hasher = MyHasher(key=json_data)
        res = hasher.get()

        client = boto3.client(
            "kinesis",
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_KEY,
            region_name=AWS_REGION_NAME,
        )

        response = client.put_record(
            StreamName='XXX',
            Data=json.dumps(json_data),
            PartitionKey='test',
        )

        print(response)

Here is lambda code which work fine

try:
    import json
    import boto3
    import base64
    from dateutil import parser
except Exception as e:
    pass

class MyHasher(object):
    def __init__(self, key):
        self.key = key

    def get(self):
        keys = str(self.key).encode("UTF-8")
        keys = base64.b64encode(keys)
        keys = keys.decode("UTF-8")
        return keys


def lambda_handler(event, context):
    print("Event")
    print(event)

    output = []

    for record in event["records"]:
        dat = base64.b64decode(record["data"])
        serialize_payload = json.loads(dat)
        print("serialize_payload", serialize_payload)

        json_new_line = str(serialize_payload) + "\n"

        hasherHelper = MyHasher(key=json_new_line)
        hash = hasherHelper.get()

        partition_keys = {"customer_id": serialize_payload.get("customer_id")}

        _ = {
             "recordId": record["recordId"],
             "result": "Ok",
             "data": hash,
             'metadata': {
                 'partitionKeys':
                     partition_keys
             }
             }

        print(_)

        output.append(_)
    print("*****************")
    print(output)
    return {"records": output}





image

Sample screenshots show works fine

Here are setting on firehose for dynamic partition

image

image

some reason on AWS S3 I see an error folder and all my messages go into that

I have successfully implemented lambda transformation and have made a video which can be found below I am currently stuck on the dynamic partition I have tried reading several posts but that didn't help

https://www.youtube.com/watch?v=6wot9Z93vAY&t=231s

Thank you again and looking forward to hearing from you guys

Refernecs

https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

https://www.youtube.com/watch?v=HcOVAFn-KhM

https://www.youtube.com/watch?v=PoaKgHdJgCE

https://medium.com/@bv_subhash/kinesis-firehose-performs-partitioning-based-on-timestamps-and-creates-files-in-s3-but-they-would-13efd51f6d39

https://www.amazonaws.cn/en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/


Solution

  • There are two prefix options for dynamic partitioning. 1) partitionKeyFromQuery 2) partitionKeyFromLambda. If you want firehose to parse record and get partition key then use first option. If you want to provide partition key after performing transformation use second option. As per your firehose config, you are using lambda to provide partition key (second option) but prefix is provided for first option. To resolve this issue either disable inline parsing and add second option to firehose prefix !{partitionKeyFromLambda:customer_id}/ or remove lambda transformation and keep inline parsing