Search code examples
spring-bootspring-integrationspring-integration-aws

spring boot and spring integration aws kinesis error


I have setup spring boot, spring-integration-aws with kinesis and keep on getting error below repeatedly when the application starts up.

  1. I don't know if my config is correct or not?

  2. Is my maven pom.xml correct?

  3. It would be good if spring.io can provide an end-to-end sample application for spring-integration and aws-kinisis.

Any help appreciated eliminating this error. Its continuously repeated.

Thanks you in advance.

2018-03-21 19:38:20.763  INFO 8417 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception when describing stream [TestStream]. Backing off for [1000] millis.

com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found. (Service: AmazonKinesis; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: 694ca3a0-2d3f-11e8-b393-0de7c3ab0fa7)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1630) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1302) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2388) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
    at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2364) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
    at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:754) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
    at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:729) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.lambda$populateShardsForStream$0(KinesisMessageDrivenChannelAdapter.java:467) ~[spring-integration-aws-2.0.0.BUILD-20180319.195200-12.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_91]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_91]
    at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]

@Configuration
@EnableIntegration
public class ReceiverConfig {

    private final int port = 4567;

    private KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis) {
        KinesisMessageDrivenChannelAdapter adapter =
                new KinesisMessageDrivenChannelAdapter(amazonKinesis, "TestStream");
        adapter.setOutputChannel(kinesisReceiveChannel());
        adapter.setErrorMessageStrategy(new KinesisMessageHeaderErrorMessageStrategy());
        adapter.setErrorChannel(errorChannel());
        return adapter;
    }

    @Bean
    public AmazonKinesis amazonKinesis() {
        AmazonKinesis amazonKinesis;

        String url = "http://localhost:" + this.port;

        // See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
        System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");

        amazonKinesis = AmazonKinesisAsyncClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
                .withClientConfiguration(
                        new ClientConfiguration()
                                .withMaxErrorRetry(0)
                                .withConnectionTimeout(1000))
                .withEndpointConfiguration(
                        new AwsClientBuilder.EndpointConfiguration(
                                url,
                                Regions.DEFAULT_REGION.getName()))
                .build();

        //System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);

        return amazonKinesis;
    }

    @Bean
    public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel1(AmazonKinesis amazonKinesis) {
        return kinesisMessageDrivenChannelAdapter(amazonKinesis);
    }

    @Bean
    public PollableChannel kinesisReceiveChannel() {
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setDatatypes(String.class);
        return queueChannel;
    }

    @Bean
    public PollableChannel errorChannel() {
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.addInterceptor(new ChannelInterceptorAdapter() {

            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                super.postSend(message, channel, sent);

                if (message instanceof ErrorMessage) {
                    throw (RuntimeException) ((ErrorMessage) message).getPayload();
                }
            }
        });
        return queueChannel;
    }
}



@Component
public class EventProcessor {

    @ServiceActivator(inputChannel = "kinesisReceiveChannel", poller = {@Poller(fixedRate = "1000")} )
    public void onEvent(String event) {
        System.out.println("event: " + event);
    }
}


@SpringBootApplication
public class KreceiverApplication implements CommandLineRunner {

    private static Log logger = LogFactory.getLog(KreceiverApplication.class);

    private static final String TEST_STREAM = "TestStream";

    @Autowired
    private AmazonKinesis amazonKinesis;

    public static void main(String[] args) {
        SpringApplication.run(KreceiverApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        try {
            this.amazonKinesis.listStreams();
        } catch (SdkClientException e) {
            logger.warn("Tests not running because no Kinesis on ...", e);
        }
    }
}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kreceiver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kreceiver</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <spring-cloud-aws.version>1.2.2.RELEASE</spring-cloud-aws.version>
        <spring-integration-aws-version>2.0.0.BUILD-SNAPSHOT</spring-integration-aws-version>

        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-aws</artifactId>
            <version>${spring-integration-aws-version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws-dependencies</artifactId>
            <version>${spring-cloud-aws.version}</version>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-kinesis</artifactId>
            <version>1.11.297</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

Solution

  • Looks like you are going to test your solution against kinesalite according your comments in the amazonKinesis(). And having an error in the stack trace like:

     com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found.
    

    That's really says that there is no TestStream stream in that local kinesalite resource.

    Not sure what you would like to hear from Spring IO, but what you have is a conceptual error in using AWS Kinesis per se: the stream must exists in the Kinesis before starting to use it. And that's exactly a rule for any AWS resource. It's not going to be created on demand.

    You can do that yourself, of course. And that is exactly what we do in our integration tests:

    KINESIS_LOCAL_RUNNING.getKinesis().createStream(TEST_STREAM, 1);
    

    I would suggest you to add such a code:

    this.amazonKinesis.createStream(TEST_STREAM);
    

    Into your run(String... args).