Search code examples
aws-lambdaamazon-sqsamazon-kinesis

Kinesis with SQS DLQ missing event data


I'm trying to set up a DLQ for a Kinesis. I used SQS and set it as the Kinesis on failure destination.

The Kinesis is attached to a lambda that always throws an error so the event will go right away to the SQS DLQ.

I can see the events in the SQS, but that payload of the event is missing ( the json I send as part of the event ), in the lambda if I print the event before throwing the exception, I can see the base64 encoded data, but not in my DLQ.

Is there a way to send the event data to the DLQ as well? I want to be able to examine the cause of the error correctly and put the event back to the Kinesis after I finished fixing the issue in the lambda.


Solution

  • https://docs.aws.amazon.com/lambda/latest/dg//with-kinesis.html#services-kinesis-errors

    The actual records aren't included, so you must process this record and retrieve them from the stream before they expire and are lost.

    According to the above the event payload won't be sent to the DLQ event so "missing event data" is expected here.

    Therefore, in order to retrieve the actual record back, you might want to try something like

    1) assuming we have the following kinesis batch info

    {
      "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
      }
    }
    

    2) we can get the record back by doing something like

    import AWS from 'aws-sdk';
    
    const kinesis = new AWS.Kinesis();
    
    const ShardId = 'shardId-000000000001';
    const ShardIteratorType = 'AT_SEQUENCE_NUMBER';
    const StreamName = 'my-awesome-stream';
    const StartingSequenceNumber =
      '49601189658422359378836298521827638475320189012309704722';
    
    const { ShardIterator } = await kinesis
      .getShardIterator({
        ShardId,
        ShardIteratorType,
        StreamName,
        StartingSequenceNumber,
      })
      .promise();
    
    const records = await kinesis
      .getRecords({
        ShardIterator,
      })
      .promise();
    
    console.log('Records', records);
    

    NOTE: don't forget to make sure your process has permission to 1) kinesis:GetShardIterator 2) kinesis:GetRecords

    Hope that helps!