Search code examples
amazon-web-servicesterraformamazon-cloudwatchamazon-kinesis

Cloudwatch Subscription Filter does not ingest to Kinesis Data Stream


I am trying to forward Cloudwatch logs to Kinesis Data Stream using the Subscription filter.
Even with broad permissions, for some reason, I am not able to see any records in Kinesis. I could see logs in the Cloudwatch log group but are not being forwarded to Kinesis Data Stream. The lambda is a basic one with just a couple of console.log statements.

Here is the terraform code for the stack.

resource "aws_iam_role" "sample_lambda_role" {
  name = "sample_lambda_role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_policy" "sample_lambda_policy" {

  name        = "sample_lambda_policy"
  path        = "/"
  description = "AWS IAM Policy for managing aws lambda role"
  policy      = <<EOF
{
 "Version": "2012-10-17",
 "Statement": [
   {
     "Action": [
       "logs:CreateLogGroup",
       "logs:CreateLogStream",
       "logs:PutLogEvents"
     ],
     "Resource": "arn:aws:logs:*:*:*",
     "Effect": "Allow"
   }
 ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "sample_lambda_attach_iam_policy_to_iam_role" {
  role       = aws_iam_role.sample_lambda_role.name
  policy_arn = aws_iam_policy.sample_lambda_policy.arn
}

data "archive_file" "lambda_app_zip" {
  type       = "zip"
  source_dir = "${path.module}/sample-lambda"
  #source_file = "index.js" #if one file
  output_path = "${path.module}/sample-lambda.zip"
}

resource "aws_lambda_function" "sample_lambda" {
  filename         = "${path.module}/sample-lambda.zip"
  function_name    = "sample-lambda"
  role             = aws_iam_role.sample_lambda_role.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_app_zip.output_base64sha256
  runtime          = "nodejs14.x"
  depends_on = [
    aws_iam_role_policy_attachment.sample_lambda_attach_iam_policy_to_iam_role
  ]
}

resource "aws_cloudwatch_log_group" "sample_lambda_function_log_group" {
  name              = "/aws/lambda/${aws_lambda_function.sample_lambda.function_name}"
  retention_in_days = 1
  lifecycle {
    prevent_destroy = false
  }
}

resource "aws_kinesis_stream" "log_stream" {
  name             = "terraform-kinesis-test"
  shard_count      = 1
  retention_period = 24

  shard_level_metrics = [
    "IncomingBytes",
    "OutgoingBytes",
  ]

}

resource "aws_iam_role" "cloudwatch_ingestion_role" {
  name = "cloudwatch_ingestion_role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": [
          "logs.amazonaws.com"
        ]
      },
      "Effect": "Allow",
      "Sid": "",
      "Condition": { 
        "StringLike": { "aws:SourceArn": "arn:aws:logs:*:*:*" } 
      }
    }
  ]
}
EOF
}

resource "aws_iam_policy" "cloudwatch_ingestion_policy" {
  name        = "cloudwatch_ingestion_policy"
  path        = "/"
  description = "AWS IAM Policy for cloudwatch logs ingestion"
  policy      = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "kinesis:*"
      ],
      "Resource": "arn:aws:kinesis:*:*:stream/*",
      "Effect": "Allow"
    }
  ]
}
EOF
}


resource "aws_iam_role_policy_attachment" "cloudwatch_ingestion_attach_iam_policy_to_iam_role" {
  role       = aws_iam_role.cloudwatch_ingestion_role.name
  policy_arn = aws_iam_policy.cloudwatch_ingestion_policy.arn
}

resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter" {
  name     = "sample_lambda_function_logfilter"
  role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  //filter_pattern = "logtype test"
  filter_pattern  = "" //WILL THIS WORK?
  destination_arn = aws_kinesis_stream.log_stream.arn
  distribution    = "ByLogStream"
}

Solution

  • I noticed discussions around the filter_pattern value in the comment section, so I conducted experiments with and without spaces in the subscription filter pattern value and observed that both filter patterns behave same.

    Here is the Terraform script I used:

    resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
      name     = "sample_lambda_function_logfilter_with_space"
      role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
      log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
      filter_pattern  = " "
      destination_arn = aws_kinesis_stream.log_stream.arn
      distribution    = "ByLogStream"
    }
    
    resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
      name     = "sample_lambda_function_logfilter_without_space"
      role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
      log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
      filter_pattern  = ""
      destination_arn = aws_kinesis_stream.log_stream.arn
      distribution    = "ByLogStream"
    }
    

    Terraform plan output:

      # aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_with_space will be created
      + resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
          + destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
          + distribution    = "ByLogStream"
          + filter_pattern  = " "
          + id              = (known after apply)
          + log_group_name  = "/aws/lambda/sample-lambda"
          + name            = "sample_lambda_function_logfilter_with_space"
          + role_arn        = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
        }
    
      # aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_without_space will be created
      + resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
          + destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
          + distribution    = "ByLogStream"
          + id              = (known after apply)
          + log_group_name  = "/aws/lambda/sample-lambda"
          + name            = "sample_lambda_function_logfilter_without_space"
          + role_arn        = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
        }
    

    Note that if we pass a filter pattern without a space value, Terraform does not assign the filter pattern value during the planning stage. However, Kinesis treats this subscription filter the same as the other one.

    In the AWS Console, as depicted in the image, both subscription filter renders the same pattern value:

    enter image description here

    Hence, we can rule out any confusion regarding the subscription filter pattern value in the mentioned Terraform script.


    So, what could be the issue now? My main suspicion is that the way you're filtering records on the Kinesis data stream is wrong. I suspect you might have used the Latest ShardIteratorType (starting position drop-down) when fetching records in the Data Viewer.

    The Latest ShardIteratorType shows records just after the most recent record in the shard. Considering your sequence of actions, it's likely that you executed your lambda function at first and then attempted to filter records within the Data Viewer. Due to the time lag between these actions, when you try to filter the records using the Latest starting position, Kinesis generates a data pointer after the recently published CloudWatch logs, and that was the reason why you couldn't see any records on the Kinesis.


    AWS CLI Commands

    To fetch the records using the Latest ShardIteratorType, follow these steps:

    First, execute the following command. This will create a data pointer after the most recent records present in your shard:

    aws kinesis get-shard-iterator \
        --stream-name terraform-kinesis-test \
        --shard-id shardId-000000000000 \
        --shard-iterator-type LATEST
    

    Command Output:

    {
        "ShardIterator": "AAAAAAAAAAGiKQ..."
    }
    

    Now, you can execute your lambda function to produce some CloudWatch logs. The logs then will be sent to Kinesis through the newly created subscription filter.

    Next, execute the following command to fetch records from the Kinesis data stream using the shard-iterator value that you retrieved earlier.

    aws kinesis get-records \
        --limit 10 \
        --shard-iterator "AAAAAAAAAAGiKQ..."
    

    Command Output:

    {
        "Records": [
            {
                "SequenceNumber": "49643477757265957414492357197584820922864438932158808066",
                "ApproximateArrivalTimestamp": "2023-08-10T23:43:56.704000+00:00",
                "Data": "H4sIAAAAAAAA/...",
                "PartitionKey": "f656f4eedc671f9bd3cea60ef85e599c"
            },
       ],
        "NextShardIterator": "AAAAAAAAAAFOa...",
        "MillisBehindLatest": 0
    }
    

    The Data field that you see under the records section is base64 encoded and GZIP compressed, which has a CloudWatch log, so use the following command to retrieve the actual value.

    echo -n "<BASE64ENCODED_GZIP_COMPRESSED_DATA>" | base64 -d | zcat
    

    The steps outlined above will help you in retrieving records using the Latest ShardIteratorType type via the AWS CLI. Nevertheless, if your intention is to directly view the records within the AWS Console's Data Viewer section, you can use the alternative ShardIteratorTypes.

    For instance, when utilizing the TRIM_HORIZON starting position, the data will appear as shown in the image below:

    enter image description here

    To know more about the ShardIteratorTypes, refer this link.