Search code examples
apache-kafkaapache-kafka-streamsspring-cloud-stream

Kafka Streams replication factor not applied to state store changelog topics


We are using Kafka Streams via Spring Cloud Stream integration. I configured the replication factor to be used across all internal Kafka Streams topics by setting

spring.cloud.stream.kafka.streams.binder.configuration.replication.factor=${REPL_FACTOR}

It works for most repartition/changelog topics used internally by Kafka Streams. However, it looks like this setting has no effect on state store changelog topics that get created manually via Materialized#as(StoreSupplier). For those topics I can still see the replication factor is set to default 1. It's also not possible to set it using Materialized#withLoggingEnabled(Map<String, String>) because this only accepts topic-level configs (replication.factor is Streams config). Is this a known bug in Kafka Streams? I couldn't find anything. If so, is there a workaround to increase the replication factor for those changelog topics?

We are using Kafka v2.3.1 on the broker side and 2.5.0 on client side.


Solution

  • Starting with version 2.4, the AdminClient can now set the replication factor to -1 in NewTopic, meaning that the default.replication.factor should be used when creating topics - KIP-464.

    However, it appears that Kafka Streams does not currently use this feature; there is an open issuee KAFKA-8531 for this.

    You can set the replication factor for internal topics using

    StreamsConfig.REPLICATION_FACTOR_CONFIG)
    

    https://kafka.apache.org/documentation/#replication.factor

    The replication factor for change log topics and repartition topics created by the stream processing application.

    Since you are setting that, via the binder config, it should work as expected.

    EDIT

    What version of spring-cloud-stream are you using? I just tested with 3.0.8 and it works as expected.

    spring.cloud.stream.kafka.streams.binder.configuration.replication.factor: 3
    

    2020-10-15 12:03:55,601 ERROR [kafka-stre] o.a.k.s.p.i.StreamThread:673 - stream-thread [kafka-streams-inventory-processor-b8d07a5a-f3c4-476a-a265-119163d2acb7-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.streams.errors.StreamsException: Could not create topic kafka-streams-inventory-processor-inventory-counts-changelog.

    Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.