Search code examples
amazon-web-servicesamazon-kinesis-firehose

AWS Firehose - Where is the definition of the event format to process in lambda?


Amazon Kinesis Data Firehose Data Transformation does not provide any information about the event data format coming to a lambda function from Firehose.

How could we code a lambda function to do the transformation without such information?


Solution

  • After much spending time:

    enter image description here

    To get the event coming to Lambda from Firehose.

    $ sam local generate-event kinesis kinesis-firehose
    {
      "invocationId": "invocationIdExample",
      "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
      "region": "us-east-1",
      "records": [
        {
          "recordId": "49546986683135544286507457936321625675700192471156785154",
          "approximateArrivalTimestamp": 1495072949453,
          "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
        }
      ]
    }
    

    Testing Firehose/lambda

    Testing Build a Modern Application on AWS - Module 5 firehose and lambda with CLI.

    Testing lambda

    aws lambda invoke --function-name ${FUNCTION_NAME} \
    --qualifier ${FUNCTION_ALIAS} \
    --payload file://./event.json \
    response.json
    
    • event.json
    {
      "records": [
        {
          "recordId": "1",
          "data": "'eyJ1c2VySWQiOiAiY3VycmVudFVzZXJJZCIsICJteXNmaXRJZCI6ICI0ZTUzOTIwYy01MDVhLTRhOTAtYTY5NC1iOTMwMDc5MWYwYWUifQ=='"
        }
      ]
    }
    

    Result Lambda log.

    START RequestId: e15a50f9-20a5-48ce-9942-9681291910fe Version: 13
    {'records': [{'recordId': '1', 'data': "'eyJ1c2VySWQiOiAiY3VycmVudFVzZXJJZCIsICJteXNmaXRJZCI6ICI0ZTUzOTIwYy01MDVhLTRhOTAtYTY5NC1iOTMwMDc5MWYwYWUifQ=='"}]}
    Processing record: 1
    {
        "userId": "currentUserId",
        "mysfitId": "4e53920c-505a-4a90-a694-b9300791f0ae",
        "goodevil": "Evil",
        "lawchaos": "Lawful",
        "species": "Chimera"
    }
    Successfully processed 1 records.
    

    Testing firehose+lambda

    echo "Testing Firehose put-record using --record file://./data.json"
    aws firehose put-record --delivery-stream-name ${DELIVERY_STREAM_NAME} \
    --record file://./data.json
    
    echo "Testing put-record using  --record='{"Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"}'"
    # aws firehose put-record --delivery-stream-name mystream --record="{\"Data\":\"1\"}"
    aws firehose put-record --delivery-stream-name "${DELIVERY_STREAM_NAME}" \
    --record='{"Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"}'
    
    echo "Testing Firehose put-record using --cli-input-json"
    aws firehose put-record \
    --cli-input-json '
    {
        "DeliveryStreamName": '\"${DELIVERY_STREAM_NAME}\"',
        "Record": {
            "Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"
        }
    }'
    

    data.json

    {
        "Data":"{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"
    }
    

    Result

    START RequestId: 94007e93-31d8-4da5-8231-c7cafa0d363a Version: 13
    {'invocationId': '6bd3e736-2ad8-41d4-9485-a0aad1806990', 'deliveryStreamArn': 'arn:aws:firehose:us-east-2:200506027189:deliverystream/masa-ecs_monolith-firehose-extended-s3-firehose-click-stream', 'region': 'us-east-2', 'records': [{'recordId': '49605256299907973028537486643826326105740520545077690370000000', 'approximateArrivalTimestamp': 1584590301809, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}, {'recordId': '49605256299907973028537486643827535031560135311691350018000000', 'approximateArrivalTimestamp': 1584590303745, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}, {'recordId': '49605256299907973028537486643828743957379750009585532930000000', 'approximateArrivalTimestamp': 1584590305222, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}]}
    
    Processing record: 49605256299907973028537486643826326105740520545077690370000000
    {
        "userId": "2",
        "mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
        "goodevil": "Neutral",
        "lawchaos": "Lawful",
        "species": "Cyclops"
    }
    Processing record: 49605256299907973028537486643827535031560135311691350018000000
    {
        "userId": "2",
        "mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
        "goodevil": "Neutral",
        "lawchaos": "Lawful",
        "species": "Cyclops"
    }
    Processing record: 49605256299907973028537486643828743957379750009585532930000000
    {
        "userId": "2",
        "mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
        "goodevil": "Neutral",
        "lawchaos": "Lawful",
        "species": "Cyclops"
    }
    Successfully processed 3 records.
    

    References

    I am afraid the AWS Firehose document is so poorly written, does not serve as a technical document.

    Not to spend time in vein, personally would go through the blogs and github repositories, not the AWS Firehose document.

    I do hope AWS will improve the document seriously so that we do not have to search around github, blogs, experimenting a lot.