Using spring boot and spring cloud stream, I am trying to send messages to Kinesis on localstack Kinesis instance run on docker but it gives me an error in certificates when KPL POST the message but i got the next error in logs:
[AWS Log: ERROR](CurlHttpClient)Curl returned error code 60 - Peer certificate cannot be authenticated with given CA certificates
Exception name:
Error message: Unable to connect to endpoint
0 response headers:
2022-09-11 03:20:59.089 INFO 99719 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader : [2022-09-11 03:20:59.089226] [0x00018612][0x00007f595bfff640] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
2022-09-11 03:20:59.143 WARN 99719 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader : [2022-09-11 03:20:59.143496] [0x00018612][0x00007f5981577640] [warning] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 60 - Peer certificate cannot be authenticated with given CA certificates
2022-09-11 03:20:59.143 WARN 99719 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader : [2022-09-11 03:20:59.143556] [0x00018612][0x00007f5981577640] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: -1
I also tried to run it from another machine and here is the new logs:
2022-09-11 17:12:29.293 INFO 2025772 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader : [2022-09-11 17:12:29.293064] [0x001eea80][0x00007f23737fe700] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
2022-09-11 17:12:29.470 INFO 2025772 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader : [2022-09-11 17:12:29.470523] [0x001eea80][0x00007f2373fff700] [info] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
2022-09-11 17:12:29.470 WARN 2025772 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader : [2022-09-11 17:12:29.470572] [0x001eea80][0x00007f2373fff700] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close
content-length : 107
content-type : application/x-amz-json-1.1
date : Sun, 11 Sep 2022 15:12:29 GMT
x-amz-id-2 : HHnAW475wVldQOYvBAccWb5IobzQIQ4xpYTUC6fSVaSE0L5fCX/RAAimOR7Ii7Gm1/Q6Ssd2zUiAQaUM3wQPY1sGhXcJnr1D
x-amzn-requestid : ecd98905-dfba-07c0-b74e-7418d2d0b2eb
I feel it connects to AWS main amazon services not localstack services, because i see this line Resolved remote host IP address: 3.91.171.213 in the next logs:
2022-09-12 01:13:53.335 WARN 2229288 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader : [2022-09-12 01:13:53.335070] [0x0022058d][0x00007f90d3fff700] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Resolved remote host IP address: 3.91.171.213
Request ID: ede9f5b6-bbb1-432e-b671-9847fa4df10d
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close
content-length : 107
content-type : application/x-amz-json-1.1
date : Sun, 11 Sep 2022 23:13:53 GMT
x-amz-id-2 : QozrFgS7zp3nKfs5X3wTkfj/dYBCTTEENBMTExhzOmXRGBBBa72Lu1ckuxz1JDrqWVgysVyb1v5+I4TBrIUbhf5B6+saa2k8
x-amzn-requestid : ede9f5b6-bbb1-432e-b671-9847fa4df10d
Here are my setup:
1- localstack.yml file which i am using to setup my stack on docker using the next command
docker compose -f localstack.yml up
version: "3.8"
services:
localstack:
image: localstack/localstack:latest
environment:
- SERVICES=kinesis,dynamodb,cloudwatch
- USE_SSL=false
- AWS_DEFAULT_REGION=us-east-1
- EDGE_PORT=4566
- DATA_DIR=/tmp/localstack/data # Local directory for saving persistent data
- DEBUG=1
- KINESIS_INITIALIZE_STREAMS=my-test-delivery-stream:2
ports:
- "4566-4584:4566-4584"
volumes:
- "${TEMPDIR:-/tmp/localstack}:/tmp/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
dynamodb-admin:
image: aaronshaf/dynamodb-admin
environment:
- DYNAMO_ENDPOINT=localstack:4566
ports:
- 8001:8001
2- application.yml
server:
port: 8080
spring:
profiles:
active: local
application:
name: my-producer
cloud:
function:
definition: timeSupplier
stream:
function:
bindings:
timeSupplier-out-0: output
bindings:
output:
destination: my-test-delivery-stream
content-type: text/plain
timeSupplier-out-0:
destination: my-test-delivery-stream
content-type: text/plain
kinesis:
binder:
checkpoint:
table: my-test-delivery-stream-checkpoint
locks:
table: my-test-delivery-stream-locks
kpl-kcl-enabled: true
auto-create-stream: true
auto-add-shards: true
min-shard-count: 1
poller:
# trigger calling each 3 seconds
fixed-delay: 3000
cloud:
aws:
kinesis:
endpoint: http://localhost:4566
dynamo-db:
endpoint: http://localhost:4566
cloud-watch:
endpoint: http://localhost:4566
credentials:
accessKey: dummy
secretKey: dummy
profile-name: dummy
region:
static: us-east-1
stack:
auto: true
logging:
level:
root: TRACE
3- AWS config local component
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
@Configuration
@Profile("local")
public class AwsConfigLocal {
@Value("${cloud.aws.region.static}")
private String region;
@Value("${cloud.aws.kinesis.endpoint}")
private String kinesisEndpoint;
@Value("${cloud.aws.dynamo-db.endpoint}")
private String dynamoDbEndpoint;
@Value("${cloud.aws.dynamo-db.endpoint}")
private String cloudWatchEndpoint;
@Bean
public AmazonKinesisAsync amazonKinesisAsync() {
return AmazonKinesisAsyncClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(kinesisEndpoint, region))
.build();
}
@Bean
@Primary
public AmazonDynamoDBAsync amazonDynamoDbAsync() {
return AmazonDynamoDBAsyncClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(dynamoDbEndpoint, region))
.build();
}
@Bean
public AmazonCloudWatchAsync amazonCloudWatchAsync() {
return AmazonCloudWatchAsyncClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(cloudWatchEndpoint, region))
.build();
}
}
4- My producer component which hold the creation of supplier bean
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class TestProducerConfiguration {
@Bean
public Supplier<String> timeSupplier() {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
String dateTime = LocalDateTime.now().format(dtf);
log.info("Sending message '{}' to kinesis", dateTime);
return () -> dateTime;
}
}
Note: I am using the next dependency version
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>2.2.0</version>
</dependency>
This has nothing to do with Spring Cloud Stream. This is just a matter of AWS KPL client and Localstack interactions.
According to this GH issue, it looks like with the latest Localstack there is just enough to setVerifyCertificate(false)
on the KinesisProducerConfiguration
. Doesn't look like you have one at all. You can find one in my comment in that issue.