Search code examples
spring-cloud-streamspring-cloud-awsspring-integration-aws

Spring Cloud Aws kinesis Binder load balancing


I was trying to implement the load balancing for Aws kinesis stream consumers

As per documentation I am trying to implement

spring:
  cloud:
    stream:
      instanceIndex: 1
      instanceCount: 3
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-kinesis-stream
          content-type: application/json

I have 3 containers, I want to bring up new containers(At max 6) if needed and without restarting the existing.

  1. The instanceIndex starts from 0 or 1.
  2. If I give the instanceCount as 6 but bring up only three instances, will all the messages are consumed until I bring up new instances.
  3. In documentation, there is a property called spring.cloud.stream.bindings..consumer.concurrency, Can you help the importance of it.
  4. For some reasons, if any of the instance goes down, will any of the messages be unconsumed.

Can you please help us


Solution

  • The spring.cloud.stream.bindings..consumer.concurrency is an internal option per consumer:

    adapter.setConcurrency(properties.getConcurrency());
    

    ...

    /**
     * The maximum number of concurrent {@link ConsumerInvoker}s running.
     * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
     * Messages from within the same shard will be processed sequentially.
     * In other words each shard is tied with the particular thread.
     * By default the concurrency is unlimited and shard
     * is processed in the {@link #consumerExecutor} directly.
     * @param concurrency the concurrency maximum number
     */
    public void setConcurrency(int concurrency) {
    

    so, this does nothing with your distributed solution.

    The instanceIndex and instanceCount works like this in the Binder:

        if (properties.getInstanceCount() > 1) {
            shardOffsets = new HashSet<>();
            KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
            List<Shard> shards = kinesisConsumerDestination.getShards();
            for (int i = 0; i < shards.size(); i++) {
                // divide shards across instances
                if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
                    KinesisShardOffset shardOffset = new KinesisShardOffset(
                            kinesisShardOffset);
                    shardOffset.setStream(destination.getName());
                    shardOffset.setShard(shards.get(i).getShardId());
                    shardOffsets.add(shardOffset);
                }
            }
        }
    

    So, every consumer gets a sub-set of shards in the stream. Therefore if you have more shards, than instances you may end up with the fact that some shards are not consumed.

    There is nothing to consume messages from the same shard concurrently: only one thread can consume one shard per cluster.