We are using Kafka Streams via Spring Cloud Stream integration. I configured the replication factor to be used across all internal Kafka Streams topics by setting
spring.cloud.stream.kafka.streams.binder.configuration.replication.factor=${REPL_FACTOR}
It works for most repartition/changelog topics used internally by Kafka Streams. However, it looks like this setting has no effect on state store changelog topics that get created manually via Materialized#as(StoreSupplier)
. For those topics I can still see the replication factor is set to default 1. It's also not possible to set it using Materialized#withLoggingEnabled(Map<String, String>)
because this only accepts topic-level configs (replication.factor
is Streams config). Is this a known bug in Kafka Streams? I couldn't find anything. If so, is there a workaround to increase the replication factor for those changelog topics?
We are using Kafka v2.3.1 on the broker side and 2.5.0 on client side.
Starting with version 2.4, the AdminClient
can now set the replication factor to -1 in NewTopic
, meaning that the default.replication.factor
should be used when creating topics - KIP-464.
However, it appears that Kafka Streams does not currently use this feature; there is an open issuee KAFKA-8531 for this.
You can set the replication factor for internal topics using
StreamsConfig.REPLICATION_FACTOR_CONFIG)
https://kafka.apache.org/documentation/#replication.factor
The replication factor for change log topics and repartition topics created by the stream processing application.
Since you are setting that, via the binder config, it should work as expected.
EDIT
What version of spring-cloud-stream are you using? I just tested with 3.0.8 and it works as expected.
spring.cloud.stream.kafka.streams.binder.configuration.replication.factor: 3
2020-10-15 12:03:55,601 ERROR [kafka-stre] o.a.k.s.p.i.StreamThread:673 - stream-thread [kafka-streams-inventory-processor-b8d07a5a-f3c4-476a-a265-119163d2acb7-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.streams.errors.StreamsException: Could not create topic kafka-streams-inventory-processor-inventory-counts-changelog.
Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.