I was trying to implement the load balancing for Aws kinesis stream consumers
As per documentation I am trying to implement
spring:
cloud:
stream:
instanceIndex: 1
instanceCount: 3
bindings:
RollUpInboundStream:
group: my-consumer-group
destination: my-kinesis-stream
content-type: application/json
I have 3 containers, I want to bring up new containers(At max 6) if needed and without restarting the existing.
Can you please help us
The spring.cloud.stream.bindings..consumer.concurrency
is an internal option per consumer:
adapter.setConcurrency(properties.getConcurrency());
...
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
so, this does nothing with your distributed solution.
The instanceIndex
and instanceCount
works like this in the Binder:
if (properties.getInstanceCount() > 1) {
shardOffsets = new HashSet<>();
KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
List<Shard> shards = kinesisConsumerDestination.getShards();
for (int i = 0; i < shards.size(); i++) {
// divide shards across instances
if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
KinesisShardOffset shardOffset = new KinesisShardOffset(
kinesisShardOffset);
shardOffset.setStream(destination.getName());
shardOffset.setShard(shards.get(i).getShardId());
shardOffsets.add(shardOffset);
}
}
}
So, every consumer gets a sub-set of shards in the stream. Therefore if you have more shards, than instances you may end up with the fact that some shards are not consumed.
There is nothing to consume messages from the same shard concurrently: only one thread can consume one shard per cluster.