I have a state store backed by a changelog topic. In order to limit the size of the changelog topic I wish to set cleanup.policy="delete,compact"
for the topic. However, when I'm creating a brand new state store with
val builder = StreamsBuilder()
val changelogConfigs = mapOf(
"cleanup.policy" to "delete,compact",
"retention.ms" to "86400000", // 1 day
)
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("state-store"),
Serdes.String(),
topicSerdeConfig.inputValueSerde
).withLoggingEnabled(changeLogConfigs)
)
the configuration is not applied to the changelog topic. The changelog topic gets the following configuration no matter how I specify changelogConfigs
:
Is this a bug, or I am doing something wrong?
I found the error, and it is somewhat related to Kafka Streams: Iterative Development and Blue-Green Deployment | by Brett Jordan | Airwallex Engineering | Medium
If I also changed the application.id when creating the state store, the changelog topics were configured properly.
If I only were to change the state store name, effectively making a new state store, and not change the application.id, the state store would be configured with the weird default configuration.