Search code examples
spring-integrationspring-cloudspring-cloud-streamspring-cloud-awsspring-integration-aws

Spring cloud AWS kinesis stream binder failed to start due to improper bean initialization


I am trying to run this simple kinesis message consumer with the following code. This is the only class in application

I am facing this error since, I have updated to latest snapshot version of kinesis binder

    @SpringBootApplication
    @RestController
    @EnableBinding(Sink.class)
    @EnableAutoConfiguration
    public class ProducerApplication {



      public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
      }

      @StreamListener(Sink.INPUT)
      public void listen(String message) {
        System.out.println("Message has been received"+message);
      }

    }

Application yml

server.port: 8081


spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my.sink
          content-type: application/json




cloud:
  aws:
    region:
      static: us-east-1
    credentials:
      accessKey: <accessKey>
      secretKey: <secretKey>

build.gradle

     buildscript {
        ext {
            springBootVersion = '2.0.3.RELEASE'
        }
        repositories {
            mavenCentral()
            maven { url "https://repo.spring.io/snapshot" }
            maven { url "https://repo.spring.io/milestone" }
        }
        dependencies {
            classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        }
    }

    apply plugin: 'java'
    apply plugin: 'eclipse'
    apply plugin: 'org.springframework.boot'
    apply plugin: 'io.spring.dependency-management'

    group = 'com.kinesis.demo'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = 1.8

    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }


    dependencies {
        compile('org.springframework.boot:spring-boot-starter')
        compile('org.springframework.boot:spring-boot-starter-web')
        compile('org.springframework.boot:spring-boot-starter-actuator')
        compile('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.BUILD-SNAPSHOT')
        testCompile('org.springframework.boot:spring-boot-starter-test')
    }

I am getting the bean initialzation exception and there seems to be a problem in creating bean DynamoDbMetadataStore.

2018-07-10 10:53:22.629  INFO 18332 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.lang.IllegalStateException: The component has not been initialized: DynamoDbMetadataStore{table={SpringIntegrationMetadataStore: {AttributeDefinitions: [{AttributeName: KEY,AttributeType: S}],TableName: SpringIntegrationMetadataStore,KeySchema: [{AttributeName: KEY,KeyType: HASH}],TableStatus: ACTIVE,CreationDateTime: Wed Jun 27 10:51:53 IST 2018,ProvisionedThroughput: {NumberOfDecreasesToday: 0,ReadCapacityUnits: 1,WriteCapacityUnits: 1},TableSizeBytes: 0,ItemCount: 0,TableArn: arn:aws:dynamodb:us-east-1:1234567:table/SpringIntegrationMetadataStore,TableId: d0cf588b-e122-406b-ad82-06255dfea6d4,}}, createTableRetries=25, createTableDelay=1, readCapacity=1, writeCapacity=1, timeToLive=null}.
 Is it declared as a bean? during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='my.sink', shard='shardId-000000000000', reset=false}, state=NEW}] task invocation.
Process will be retried on the next iteration.

This error started after updating to latest snapshot version of kinesis binder.

Can you please check if some thing is wrong.


Solution

  • I have just fixed the issue: https://github.com/spring-projects/spring-integration-aws/commit/fc34f814e557936d1bcb815d0879bd4f6e035675

    The problem was that when we have already a table in the DynamoDB, we just return from the afterPropertiesSet() leaving the initialized as false.

    The latest BUILD-SNAPSHOT should work now.