I have setup spring boot, spring-integration-aws with kinesis and keep on getting error below repeatedly when the application starts up.
I don't know if my config is correct or not?
Is my maven pom.xml correct?
It would be good if spring.io can provide an end-to-end sample application for spring-integration and aws-kinisis.
Any help appreciated eliminating this error. Its continuously repeated.
Thanks you in advance.
2018-03-21 19:38:20.763 INFO 8417 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception when describing stream [TestStream]. Backing off for [1000] millis.
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found. (Service: AmazonKinesis; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: 694ca3a0-2d3f-11e8-b393-0de7c3ab0fa7)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1630) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1302) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2388) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2364) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:754) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:729) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.lambda$populateShardsForStream$0(KinesisMessageDrivenChannelAdapter.java:467) ~[spring-integration-aws-2.0.0.BUILD-20180319.195200-12.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]
@Configuration
@EnableIntegration
public class ReceiverConfig {
private final int port = 4567;
private KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis) {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, "TestStream");
adapter.setOutputChannel(kinesisReceiveChannel());
adapter.setErrorMessageStrategy(new KinesisMessageHeaderErrorMessageStrategy());
adapter.setErrorChannel(errorChannel());
return adapter;
}
@Bean
public AmazonKinesis amazonKinesis() {
AmazonKinesis amazonKinesis;
String url = "http://localhost:" + this.port;
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
amazonKinesis = AmazonKinesisAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
.withClientConfiguration(
new ClientConfiguration()
.withMaxErrorRetry(0)
.withConnectionTimeout(1000))
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
url,
Regions.DEFAULT_REGION.getName()))
.build();
//System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
return amazonKinesis;
}
@Bean
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel1(AmazonKinesis amazonKinesis) {
return kinesisMessageDrivenChannelAdapter(amazonKinesis);
}
@Bean
public PollableChannel kinesisReceiveChannel() {
QueueChannel queueChannel = new QueueChannel();
queueChannel.setDatatypes(String.class);
return queueChannel;
}
@Bean
public PollableChannel errorChannel() {
QueueChannel queueChannel = new QueueChannel();
queueChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
super.postSend(message, channel, sent);
if (message instanceof ErrorMessage) {
throw (RuntimeException) ((ErrorMessage) message).getPayload();
}
}
});
return queueChannel;
}
}
@Component
public class EventProcessor {
@ServiceActivator(inputChannel = "kinesisReceiveChannel", poller = {@Poller(fixedRate = "1000")} )
public void onEvent(String event) {
System.out.println("event: " + event);
}
}
@SpringBootApplication
public class KreceiverApplication implements CommandLineRunner {
private static Log logger = LogFactory.getLog(KreceiverApplication.class);
private static final String TEST_STREAM = "TestStream";
@Autowired
private AmazonKinesis amazonKinesis;
public static void main(String[] args) {
SpringApplication.run(KreceiverApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
try {
this.amazonKinesis.listStreams();
} catch (SdkClientException e) {
logger.warn("Tests not running because no Kinesis on ...", e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kreceiver</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kreceiver</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-cloud-aws.version>1.2.2.RELEASE</spring-cloud-aws.version>
<spring-integration-aws-version>2.0.0.BUILD-SNAPSHOT</spring-integration-aws-version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
<version>${spring-integration-aws-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>${spring-cloud-aws.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.297</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Looks like you are going to test your solution against kinesalite
according your comments in the amazonKinesis()
. And having an error in the stack trace like:
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found.
That's really says that there is no TestStream
stream in that local kinesalite resource.
Not sure what you would like to hear from Spring IO, but what you have is a conceptual error in using AWS Kinesis per se: the stream must exists in the Kinesis before starting to use it. And that's exactly a rule for any AWS resource. It's not going to be created on demand.
You can do that yourself, of course. And that is exactly what we do in our integration tests:
KINESIS_LOCAL_RUNNING.getKinesis().createStream(TEST_STREAM, 1);
I would suggest you to add such a code:
this.amazonKinesis.createStream(TEST_STREAM);
Into your run(String... args)
.