Search code examples
apache-kafka-streams

Not possible to configure changelog topics


I have a state store backed by a changelog topic. In order to limit the size of the changelog topic I wish to set cleanup.policy="delete,compact" for the topic. However, when I'm creating a brand new state store with

val builder = StreamsBuilder()

val changelogConfigs = mapOf(
        "cleanup.policy" to "delete,compact",
        "retention.ms" to "86400000", // 1 day
    )

    builder.addStateStore(
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("state-store"),
            Serdes.String(),
            topicSerdeConfig.inputValueSerde
        ).withLoggingEnabled(changeLogConfigs)
    )

the configuration is not applied to the changelog topic. The changelog topic gets the following configuration no matter how I specify changelogConfigs:

enter image description here enter image description here

Is this a bug, or I am doing something wrong?


Solution

  • I found the error, and it is somewhat related to Kafka Streams: Iterative Development and Blue-Green Deployment | by Brett Jordan | Airwallex Engineering | Medium

    If I also changed the application.id when creating the state store, the changelog topics were configured properly.

    If I only were to change the state store name, effectively making a new state store, and not change the application.id, the state store would be configured with the weird default configuration.