Search code examples
javaapache-kafkaapache-kafka-streamsapache-kafka-connectconfluent-platform

Kafka Streams 2.6, Partition Assignor and Rebalancing Strategy


In my current Kafka version which is 2.6, i am using Streams API and i have a question. When i start a stream, it writes Streams,Admin,Consumer and Produces configs. I noticed something strange that although i provide configuration

streamsConfiguration.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());

like above, i see some different strategies in consumer and stream logs.

Here is consumer logs that shows consumer configs

2021-01-20 15:52:32.611  INFO 111980 --- [alytics.event-4] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
allow.auto.create.topics = true
auto.commit.interval.ms = 500
auto.offset.reset = none
bootstrap.servers = [XXX:9092, XXX:9092, XXX:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = APPID-dd747646-8b51-42b0-8ad9-2fb26435a588-StreamThread-2-restore-consumer
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 25000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

but also i saw logs like below

2021-01-20 15:52:32.740  INFO 111980 --- [alytics.event-4] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
allow.auto.create.topics = false
auto.commit.interval.ms = 500
auto.offset.reset = latest
bootstrap.servers = [XXX:9092, XXX:9092, XXX:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = APPID-dd747646-8b51-42b0-8ad9-2fb26435a588-StreamThread-2-consumer
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 25000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]

when i check those two consumer logs, i only noticed that their client.id values are different.

I am a bit consufed that did i enable CooperativeStickyAssignor or not ?

what are the differences between these two consumers that causes to use different partition assignment strategy ?

Is it normal that i see different consumer configuration in a same kafka streams application ?

Thank you


Solution

  • The first consumer logs in your question is that of a "restore" consumer which manages state store recovery. You can find the word "restore" in the client id. The second consumer logs that you showed in your question is that of your own defined consumer. It seems that the strategy that is used by your consumer is "StreamsPartitionAssignor".