In my current Kafka version which is 2.6, i am using Streams API and i have a question. When i start a stream, it writes Streams,Admin,Consumer and Produces configs. I noticed something strange that although i provide configuration
streamsConfiguration.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
like above, i see some different strategies in consumer and stream logs.
Here is consumer logs that shows consumer configs
2021-01-20 15:52:32.611 INFO 111980 --- [alytics.event-4] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 500
auto.offset.reset = none
bootstrap.servers = [XXX:9092, XXX:9092, XXX:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = APPID-dd747646-8b51-42b0-8ad9-2fb26435a588-StreamThread-2-restore-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 25000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
but also i saw logs like below
2021-01-20 15:52:32.740 INFO 111980 --- [alytics.event-4] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = false
auto.commit.interval.ms = 500
auto.offset.reset = latest
bootstrap.servers = [XXX:9092, XXX:9092, XXX:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = APPID-dd747646-8b51-42b0-8ad9-2fb26435a588-StreamThread-2-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 25000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
when i check those two consumer logs, i only noticed that their client.id values are different.
I am a bit consufed that did i enable CooperativeStickyAssignor or not ?
what are the differences between these two consumers that causes to use different partition assignment strategy ?
Is it normal that i see different consumer configuration in a same kafka streams application ?
Thank you
The first consumer logs in your question is that of a "restore" consumer which manages state store recovery. You can find the word "restore" in the client id. The second consumer logs that you showed in your question is that of your own defined consumer. It seems that the strategy that is used by your consumer is "StreamsPartitionAssignor".