Search code examples
springamazon-web-servicesspring-integrationspring-cloud-stream

spring cloud stream kinesis Binder


I am trying to implement a spring boot aws kinesis consumer that is capable of being auto-scaled in order to share the load (split processing shards) with the original instance.

What I have been able to do: using the well defined read me and examples available hereKinesis binder docs I have been able to start up multiple consumers that actually divide the shards for processing by supplying these properties.

on the producer, I supply partitionCount: 2 via an application property. and on the consumers, I supply both the instanceIndex and the instanceCount.

on consumer 1 i have instanceIndex=0 and instantCount =2 , on consumer 2 i have instanceIndex=1 and instantCount=2

this works fine and I have two spring boot applications dealing with their specific shards. But in this case, I have to have a pre-configured properties file per boot application that needs to be available upon load for them to split the load. and if I only start up the first consumer(non auto-scaled) I only process shards specific to index 0, leaving other shards unprocessed.

What I would like to do but not sure if it is possible is to have a single consumer deployed (that processes all shards). if I deploy another instance I would like that instance to relive the first consumer of some of the load, in other words, if I have 2 shards and one consumer it would process both, if I then deploy another app I would like that first consumer to now only processes from a single shard leaving the second shard to the second consumer.

I have tried to do this by not specifying instanceIndex or instanceCount on the consumers and only supplying the group name, but that leaves the second consumer idle until the first is shut down. FYI I have also created my own metadata and locking table, preventing the binder from creating the default ones.

Configurations: Producer -----------------

originator: KinesisProducer
server:
 port: 8090

    spring: 
      cloud: 
        stream: 
          bindings:
            output: 
              destination: <stream-name> 
              content-type: application/json
              producer: 
                headerMode: none
                partitionKeyExpression: headers.type

consumers-------------------------------------

originator: KinesisSink
server:
 port: 8091

spring:
  cloud:
    stream:
      kinesis:
        bindings:
          input:
            consumer:
              listenerMode: batch
              recordsLimit: 10
              shardIteratorType: TRIM_HORIZON
        binder:
          checkpoint:
            table: <checkpoint-table>
          locks:
            table: <locking-table
      bindings:
        input:
          destination: <stream-name>
          content-type: application/json
          consumer:
            concurrency: 1
            listenerMode: batch
            useNativeDecoding: true
            recordsLimit: 10
            idleBetweenPolls: 250
            partitioned: true
          group: mygroup

Solution

  • That’s correct. That’s how it works for now: if one consumer is there, it takes all the shards for processing. The second one will take an action only if the first one is broken somehow for at least one shard.

    The proper Kafka-like rebalancing is on our roadmap. We don’t have the solid vision yet, so issue on the matter and subsequent contribution are welcome!