Search code examples
scalaapache-kafkaapache-kafka-streams

Kafka Streams - Using An Existing State Store After Adding a New Source Stream


I have an existing stream which uses two topics as its source:

val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")

stream1
  .merge(stream2)
  .groupByKey
  .reduce(reduceValues)
  .toStream
  .to("result-topic")

The auto-generated name of the StateStore is KSTREAM-REDUCE-STATE-STORE-0000000003.

Now I need to add one more topic as a source. However, adding a new source increments a kafka-internal number, causing the StateStore to be KSTREAM-REDUCE-STATE-STORE-0000000005. I don't want to lose the current state, so I explicitly provide the name of the old StateStore:

val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic

stream1
  .merge(stream2)
  .merge(stream3) // merge new topic
  .groupByKey
  .reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
  .toStream
  .to("result-topic")

It seems to work, but I'm not sure if I'm interfering with the Kafka internals because:

  1. I'm using a custom name in the form of what Kafka would auto-generate (possibility of a name conflict?)
  2. The set of streams used to feed this StateStore is different than what it was initially.

Any comments?


Solution

  • To be honest, the safest option would be to add human-readable name to this state but, as you mentioned, you are going to lose it.

    I assume there shouldn't be any problem with what you did (at least until you introduce another change in code :)). ID 0000000003 is going to be assigned to groupByKey operator so there won't be any conflicts (although I am not 100% sure about Kafka Streams internals there).

    There is also Application Reset Tool that allows you to regenerate aggregations. But I don't know if it is applicable to your case: your retention policy on input topics might prevent this tool to regenerate exact aggregates.