Search code examples
spring-cloud-streamlocalstackamazon-kinesisspring-cloud-stream-binderspring-cloud-stream-binder-kinesis

Spring cloud stream Kinesis producer can't connect to localstack


Using spring boot and spring cloud stream, I am trying to send messages to Kinesis on localstack Kinesis instance run on docker but it gives me an error in certificates when KPL POST the message but i got the next error in logs:

[AWS Log: ERROR](CurlHttpClient)Curl returned error code 60 - Peer certificate cannot be authenticated with given CA certificates

Exception name: 
Error message: Unable to connect to endpoint
0 response headers:
2022-09-11 03:20:59.089  INFO 99719 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2022-09-11 03:20:59.089226] [0x00018612][0x00007f595bfff640] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
2022-09-11 03:20:59.143  WARN 99719 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2022-09-11 03:20:59.143496] [0x00018612][0x00007f5981577640] [warning] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 60 - Peer certificate cannot be authenticated with given CA certificates
2022-09-11 03:20:59.143  WARN 99719 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2022-09-11 03:20:59.143556] [0x00018612][0x00007f5981577640] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: -1

I also tried to run it from another machine and here is the new logs:

2022-09-11 17:12:29.293  INFO 2025772 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2022-09-11 17:12:29.293064] [0x001eea80][0x00007f23737fe700] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
2022-09-11 17:12:29.470  INFO 2025772 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2022-09-11 17:12:29.470523] [0x001eea80][0x00007f2373fff700] [info] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
2022-09-11 17:12:29.470  WARN 2025772 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2022-09-11 17:12:29.470572] [0x001eea80][0x00007f2373fff700] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close
content-length : 107
content-type : application/x-amz-json-1.1
date : Sun, 11 Sep 2022 15:12:29 GMT
x-amz-id-2 : HHnAW475wVldQOYvBAccWb5IobzQIQ4xpYTUC6fSVaSE0L5fCX/RAAimOR7Ii7Gm1/Q6Ssd2zUiAQaUM3wQPY1sGhXcJnr1D
x-amzn-requestid : ecd98905-dfba-07c0-b74e-7418d2d0b2eb

I feel it connects to AWS main amazon services not localstack services, because i see this line Resolved remote host IP address: 3.91.171.213 in the next logs:

2022-09-12 01:13:53.335  WARN 2229288 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2022-09-12 01:13:53.335070] [0x0022058d][0x00007f90d3fff700] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Resolved remote host IP address: 3.91.171.213
Request ID: ede9f5b6-bbb1-432e-b671-9847fa4df10d
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close
content-length : 107
content-type : application/x-amz-json-1.1
date : Sun, 11 Sep 2022 23:13:53 GMT
x-amz-id-2 : QozrFgS7zp3nKfs5X3wTkfj/dYBCTTEENBMTExhzOmXRGBBBa72Lu1ckuxz1JDrqWVgysVyb1v5+I4TBrIUbhf5B6+saa2k8
x-amzn-requestid : ede9f5b6-bbb1-432e-b671-9847fa4df10d

Here are my setup:

1- localstack.yml file which i am using to setup my stack on docker using the next command

docker compose -f localstack.yml up

version: "3.8"

services:
  localstack:
    image: localstack/localstack:latest
    environment:
      - SERVICES=kinesis,dynamodb,cloudwatch
      - USE_SSL=false
      - AWS_DEFAULT_REGION=us-east-1
      - EDGE_PORT=4566
      - DATA_DIR=/tmp/localstack/data # Local directory for saving persistent data
      - DEBUG=1
      - KINESIS_INITIALIZE_STREAMS=my-test-delivery-stream:2
    ports:
      - "4566-4584:4566-4584"
    volumes:
      - "${TEMPDIR:-/tmp/localstack}:/tmp/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"
  dynamodb-admin:
    image: aaronshaf/dynamodb-admin
    environment:
      - DYNAMO_ENDPOINT=localstack:4566
    ports:
      - 8001:8001

2- application.yml

server:
  port: 8080

spring:
  profiles:
    active: local
  application:
    name: my-producer
  cloud:
    function:
      definition: timeSupplier
    stream:
      function:
        bindings:
          timeSupplier-out-0: output

      bindings:
        output:
          destination: my-test-delivery-stream
          content-type: text/plain

        timeSupplier-out-0:
          destination: my-test-delivery-stream
          content-type: text/plain

      kinesis:
        binder:
          checkpoint:
            table: my-test-delivery-stream-checkpoint
          locks:
            table: my-test-delivery-stream-locks
          kpl-kcl-enabled: true
          auto-create-stream: true
          auto-add-shards: true
          min-shard-count: 1
      poller:
        # trigger calling each 3 seconds
        fixed-delay: 3000
cloud:
  aws:
    kinesis:
      endpoint: http://localhost:4566
    dynamo-db:
      endpoint: http://localhost:4566
    cloud-watch:
      endpoint: http://localhost:4566
    credentials:
      accessKey: dummy
      secretKey: dummy
      profile-name: dummy
    region:
      static: us-east-1
    stack:
      auto: true

logging:
  level:
    root: TRACE

3- AWS config local component

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;

import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;

@Configuration
@Profile("local")
public class AwsConfigLocal {

  @Value("${cloud.aws.region.static}")
  private String region;

  @Value("${cloud.aws.kinesis.endpoint}")

  private String kinesisEndpoint;

  @Value("${cloud.aws.dynamo-db.endpoint}")

  private String dynamoDbEndpoint;

  @Value("${cloud.aws.dynamo-db.endpoint}")

  private String cloudWatchEndpoint;

  @Bean
  public AmazonKinesisAsync amazonKinesisAsync() {
    return AmazonKinesisAsyncClientBuilder.standard()
        .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(kinesisEndpoint, region))
        .build();
  }

  @Bean
  @Primary
  public AmazonDynamoDBAsync amazonDynamoDbAsync() {
    return AmazonDynamoDBAsyncClientBuilder.standard()
        .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(dynamoDbEndpoint, region))
        .build();
  }

  @Bean
  public AmazonCloudWatchAsync amazonCloudWatchAsync() {
    return AmazonCloudWatchAsyncClientBuilder.standard()
        .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(cloudWatchEndpoint, region))
        .build();
  }

}

4- My producer component which hold the creation of supplier bean

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.function.Supplier;

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class TestProducerConfiguration {

  @Bean
  public Supplier<String> timeSupplier() {
    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    String dateTime = LocalDateTime.now().format(dtf);
    log.info("Sending message '{}' to kinesis", dateTime);
    return () -> dateTime;
  }
  
}

Note: I am using the next dependency version

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
  <version>2.2.0</version>
</dependency>

Solution

  • This has nothing to do with Spring Cloud Stream. This is just a matter of AWS KPL client and Localstack interactions.

    According to this GH issue, it looks like with the latest Localstack there is just enough to setVerifyCertificate(false) on the KinesisProducerConfiguration. Doesn't look like you have one at all. You can find one in my comment in that issue.