Search code examples
amazon-kinesistestcontainerslocalstack

SSL error when testing kinesis with localstack + test containers


I'm running into an issue with trying to connect to kinesis running in a localstack container. I made a small example test using testcontainers but I get the same error in my application.

This is the error:

[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407741] [0x00002233][0x0000700003935000] [warning] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 60 - SSL peer certificate or SSH remote key was not OK
[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407806] [0x00002233][0x0000700003935000] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: -1
Resolved remote host IP address: 127.0.0.1
Request ID: 
Exception name: 
Error message: curlCode: 60, SSL peer certificate or SSH remote key was not OK
0 response headers:
[kpl-daemon-0003] INFO com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407825] [0x00002233][0x0000700003935000] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.

The error is repeated multiple times until the test finishes. How do I fix this error and get the connection working? I have tried updating my system. I opened a cli to the container and checked that the time matched my system time (can a container even have a different time from my system?).

This is the example test.

@Testcontainers
public class KinesisTest {
    static class TestProcessorFactory implements ShardRecordProcessorFactory {

        private final TestKinesisRecordService service;

        public TestProcessorFactory(TestKinesisRecordService service) {
            this.service = service;
        }

        @Override
        public ShardRecordProcessor shardRecordProcessor() {
            return new TestRecordProcessor(service);
        }
    }

    static class TestRecordProcessor implements ShardRecordProcessor {

        public final TestKinesisRecordService service;

        public TestRecordProcessor(TestKinesisRecordService service) {
            this.service = service;
        }

        @Override
        public void initialize(InitializationInput initializationInput) {

        }

        @Override
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            service.addRecord(processRecordsInput);
        }

        @Override
        public void leaseLost(LeaseLostInput leaseLostInput) {

        }

        @Override
        public void shardEnded(ShardEndedInput shardEndedInput) {

        }

        @Override
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {

        }
    }

    static class TestKinesisRecordService {
        private List<ProcessRecordsInput> records = Collections.synchronizedList(new ArrayList<>());

        public void addRecord(ProcessRecordsInput processRecordsInput) {
            records.add(processRecordsInput);
        }

        public List<ProcessRecordsInput> getRecords() {
            return Collections.unmodifiableList(records);
        }
    }

    public static final String streamName = "stream-name";
    public static final String partitionKey = "partition-key";

    DockerImageName localstackImage = DockerImageName.parse("localstack/localstack:0.14.1");

    @Container
    public LocalStackContainer localstack = new LocalStackContainer(localstackImage)
            .withServices(KINESIS)
            .withEnv("KINESIS_INITIALIZE_STREAMS", streamName + ":1");

    public Scheduler scheduler;
    public TestKinesisRecordService service = new TestKinesisRecordService();
    public KinesisProducer producer;

    @BeforeEach
    void setup() {
        KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
                KinesisAsyncClient.builder().endpointOverride(localstack.getEndpointOverride(KINESIS)).region(Region.of(localstack.getRegion()))
        );
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(DYNAMODB)).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(CLOUDWATCH)).build();

        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, "KinesisPratTest", kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestProcessorFactory(service));

        scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        producer = producer();
    }

    @AfterEach
    public void teardown() throws ExecutionException, InterruptedException, TimeoutException {
        producer.destroy();
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
    }

    public KinesisProducer producer() {
        var configuration = new KinesisProducerConfiguration()
                .setRecordMaxBufferedTime(100)
                .setRecordTtl(30000)
                .setRequestTimeout(5000)
                .setCredentialsProvider(localstack.getDefaultCredentialsProvider())
                .setMetricsCredentialsProvider(localstack.getDefaultCredentialsProvider())
                .setRegion(localstack.getRegion())
                .setCloudwatchEndpoint(localstack.getEndpointOverride(CLOUDWATCH).getHost())
                .setCloudwatchPort(localstack.getEndpointOverride(CLOUDWATCH).getPort())
                .setKinesisEndpoint(localstack.getEndpointOverride(KINESIS).getHost())
                .setKinesisPort(localstack.getEndpointOverride(KINESIS).getPort());

        return new KinesisProducer(configuration);
    }

    @Test
    void test() {
        producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap("Hello".getBytes(StandardCharsets.UTF_8)));
        await().until(() -> service.getRecords(), records -> records.size() > 0);
    }
}

Solution

  • Error message: curlCode: 60, SSL peer certificate or SSH remote key was not OK

    Sounds like the hostname doesn't match the cert, or that you don't trust the CA that signed it.

    You could set options to disable hostname verification with KinesisProducerConfig .setVerifyCertificate(false).

    Otherwise, see about fixing the cert/trust issue.