I have very strange issue using spring-integration, kinesis binder and localstack (aws local environment) from testcontainers. I have 2 channels – A and B. On each channel I have producer and consumer as well. My tests are pretty simple – I'm sending message, and validating that the message received by consumer. So I writed 1 test for each channel and it worked for me. On third test that using channel A it doesn't work , because the consumer didn't receive the message.
This is the log from the third test
2020-05-14 17:27:40.654 INFO 29076 --- [ main] com.blabla.kinesis.demo.MessageProducer : Sending StatusEvent(status=message2)
2020-05-14 17:27:40.663 DEBUG 29076 --- [ main] o.s.i.a.outbound.KinesisMessageHandler : org.springframework.integration.aws.outbound.KinesisMessageHandler@5895c065 received message: GenericMessage [payload=byte[57], headers={contentType=application/json, id=de100ebb-518c-2025-7d71-99c94cd8207c, timestamp=1589466460663}]
2020-05-14 17:27:41.455 INFO 29076 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='input', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2020-05-14 17:27:42.661 DEBUG 29076 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : No records for [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='input', shard='shardId-000000000000', reset=false}, state=CONSUME}] on sequenceNumber [null]. Suspend consuming for [1000] milliseconds.
From the log I see that producer sent the message, but the consumer started after it. If I'm sending another message before, the second message is consumed without any problem, and the first one disappearearing
My pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.blabla.kinesis</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>1.14.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.11.415</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
After the hours of investigation I found the problem. The root cause was that in my configuration I defined only "sync" client like a following:
@Autowired
private LocalStackContainer localStackContainer;
@Bean
public AmazonKinesis kinesisClient() {
return AmazonKinesisClientBuilder.standard()
.withEndpointConfiguration(localStackContainer.getEndpointConfiguration(KINESIS))
.build();
}
@Bean
public AmazonDynamoDB dynamoDBClient() {
return AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(localStackContainer.getEndpointConfiguration(DYNAMODB))
.build();
}
After turning on all debug logs on I payed attention, that there was api calls by Spring to "real" AWS using AmazonKinesisAsyncClient. Following configuration fixed the issue:
@Bean
public AmazonDynamoDB dynamoDBClientAsync() {
return AmazonDynamoDBAsyncClientBuilder.standard()
.withEndpointConfiguration(localStackContainer.getEndpointConfiguration(DYNAMODB))
.build();
}
@Bean
public AmazonKinesis kinesisClientAsync() {
return AmazonKinesisAsyncClientBuilder.standard()
.withEndpointConfiguration(localStackContainer.getEndpointConfiguration(KINESIS))
.build();
}