Search code examples
spring-integrationamazon-kinesislocalstack

Spring integration, kinesis binder, localstack strange behaviour


I have very strange issue using spring-integration, kinesis binder and localstack (aws local environment) from testcontainers. I have 2 channels – A and B. On each channel I have producer and consumer as well. My tests are pretty simple – I'm sending message, and validating that the message received by consumer. So I writed 1 test for each channel and it worked for me. On third test that using channel A it doesn't work , because the consumer didn't receive the message.

This is the log from the third test

2020-05-14 17:27:40.654  INFO 29076 --- [           main] com.blabla.kinesis.demo.MessageProducer  : Sending StatusEvent(status=message2)
2020-05-14 17:27:40.663 DEBUG 29076 --- [           main] o.s.i.a.outbound.KinesisMessageHandler   : org.springframework.integration.aws.outbound.KinesisMessageHandler@5895c065 received message: GenericMessage [payload=byte[57], headers={contentType=application/json, id=de100ebb-518c-2025-7d71-99c94cd8207c, timestamp=1589466460663}]
2020-05-14 17:27:41.455  INFO 29076 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='input', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2020-05-14 17:27:42.661 DEBUG 29076 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : No records for [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='input', shard='shardId-000000000000', reset=false}, state=CONSUME}] on sequenceNumber [null]. Suspend consuming for [1000] milliseconds.

From the log I see that producer sent the message, but the consumer started after it. If I'm sending another message before, the second message is consumed without any problem, and the first one disappearearing

My pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.blabla.kinesis</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>localstack</artifactId>
            <version>1.14.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <dependencyManagement>

        <dependencies>
            <dependency>
                <groupId>com.amazonaws</groupId>
                <artifactId>aws-java-sdk-core</artifactId>
                <version>1.11.415</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Solution

  • After the hours of investigation I found the problem. The root cause was that in my configuration I defined only "sync" client like a following:

    @Autowired
    private LocalStackContainer localStackContainer;
    
    @Bean
    public AmazonKinesis kinesisClient() {
        return AmazonKinesisClientBuilder.standard()
                .withEndpointConfiguration(localStackContainer.getEndpointConfiguration(KINESIS))
                .build();
    }
    
    @Bean
    public AmazonDynamoDB dynamoDBClient() {
        return  AmazonDynamoDBClientBuilder.standard()
                .withEndpointConfiguration(localStackContainer.getEndpointConfiguration(DYNAMODB))
                .build();
    }
    

    After turning on all debug logs on I payed attention, that there was api calls by Spring to "real" AWS using AmazonKinesisAsyncClient. Following configuration fixed the issue:

    @Bean
    public AmazonDynamoDB dynamoDBClientAsync() {
        return  AmazonDynamoDBAsyncClientBuilder.standard()
                .withEndpointConfiguration(localStackContainer.getEndpointConfiguration(DYNAMODB))
                .build();
    }
    @Bean
    public AmazonKinesis kinesisClientAsync() {
        return AmazonKinesisAsyncClientBuilder.standard()
                .withEndpointConfiguration(localStackContainer.getEndpointConfiguration(KINESIS))
                .build();
    }