I am recently learning Apache Kafka Streams and playing the world count examples.Below is my code
public class StreamsStarterApp {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> wordCountPipe = streamsBuilder.stream("word-count-in");
wordCountPipe.filter((key, value) -> StringUtils.isNoneBlank(value))
.mapValues(value -> value.toLowerCase())
.flatMapValues(value -> Splitter.on(",").trimResults().split(value))
.groupBy((key,value)-> value)
.count(Named.as("count"))
.toStream()
.to("word-count-out", Produced.with(Serdes.String(),Serdes.Long()));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
An instresting observation I have is that if I comment '.mapValues(value -> value.toLowerCase())' the result would be different which makes me really confused that any changes in the code lead to Unpredictable result change
Send hello,hello to topic 'word-count-int'
Result will show hello 2
If I comment '.mapValues(value -> value.toLowerCase())' and send hello,world again Result will show hello 1 world 1
How could this happen ? Is that related to state store inside Kafka streams
Modifying a KafkaStreams application (ie, removing or adding an operator) may result in incompatibilities. In general, you often need to reset the application (ie, delete all it's state) if you want to change the program (cf https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html).
For you particular case, the issue is operator names. Names are generated automatically using an internal counter to avoid naming conflicts. If you remove one operator, the names of downstream operators change. Thus, the count()
operator does not find it's old state (each stat store also has a name and the name of the store changes, too), and thus you start with an empty state after you removed mapValues
.
You can inspect the naming via Topology#describe()
. This allows you to compare the topology before and after you change to the code.
To allow for compatible upgrades, the DSL allows you to specify names explicitly (cf https://docs.confluent.io/current/streams/developer-guide/dsl-topology-naming.html). This way, the naming does not change. For the word-count example, you can specify a name via:
.count(Materialized.as("myName"))