I am trying to run this simple kinesis message consumer with the following code. This is the only class in application
I am facing this error since, I have updated to latest snapshot version of kinesis binder
@SpringBootApplication
@RestController
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String message) {
System.out.println("Message has been received"+message);
}
}
Application yml
server.port: 8081
spring:
cloud:
stream:
bindings:
input:
destination: my.sink
content-type: application/json
cloud:
aws:
region:
static: us-east-1
credentials:
accessKey: <accessKey>
secretKey: <secretKey>
build.gradle
buildscript {
ext {
springBootVersion = '2.0.3.RELEASE'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.kinesis.demo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
compile('org.springframework.boot:spring-boot-starter')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.boot:spring-boot-starter-actuator')
compile('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.BUILD-SNAPSHOT')
testCompile('org.springframework.boot:spring-boot-starter-test')
}
I am getting the bean initialzation exception and there seems to be a problem in creating bean DynamoDbMetadataStore
.
2018-07-10 10:53:22.629 INFO 18332 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.lang.IllegalStateException: The component has not been initialized: DynamoDbMetadataStore{table={SpringIntegrationMetadataStore: {AttributeDefinitions: [{AttributeName: KEY,AttributeType: S}],TableName: SpringIntegrationMetadataStore,KeySchema: [{AttributeName: KEY,KeyType: HASH}],TableStatus: ACTIVE,CreationDateTime: Wed Jun 27 10:51:53 IST 2018,ProvisionedThroughput: {NumberOfDecreasesToday: 0,ReadCapacityUnits: 1,WriteCapacityUnits: 1},TableSizeBytes: 0,ItemCount: 0,TableArn: arn:aws:dynamodb:us-east-1:1234567:table/SpringIntegrationMetadataStore,TableId: d0cf588b-e122-406b-ad82-06255dfea6d4,}}, createTableRetries=25, createTableDelay=1, readCapacity=1, writeCapacity=1, timeToLive=null}.
Is it declared as a bean? during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='my.sink', shard='shardId-000000000000', reset=false}, state=NEW}] task invocation.
Process will be retried on the next iteration.
This error started after updating to latest snapshot version of kinesis binder.
Can you please check if some thing is wrong.
I have just fixed the issue: https://github.com/spring-projects/spring-integration-aws/commit/fc34f814e557936d1bcb815d0879bd4f6e035675
The problem was that when we have already a table in the DynamoDB, we just return from the afterPropertiesSet()
leaving the initialized
as false
.
The latest BUILD-SNAPSHOT
should work now.