Let's just hypothetically say groupby function was not available in kafka streams. Can I do the below to get the word count and build a KTable on top of it? Please note that i use "word-count-topic" twice in the topology. I have a use case where I want to build something iteratively and for the next stream event, I want to look up previous value and update it based on the event. I want to keep the latest value in the same topic on which I build Ktable.
KTable<String,Long> wordCountTable = builder.table("word-count-topic",Consumed.with(Serdes.String(), Serdes.Long()));
KStream<String,String> wordsStream = builder.stream("words-topic",Consumed.with(Serdes.String(), Serdes.String()));
KStream<String,String> msgStream = wordsStream
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.selectKey((k,v) -> v);
msgStream.leftJoin(kTable, (word,count) -> {
if( count == null) return new WordCount(word, Long.valueOf(1));
else return new WordCount(word, count + 1);
})
.mapValues((k,v)-> v.getCount())
.to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
streams = new KafkaStreams(builder.build(), props);
streams.start();
That should work. Why not just run the code?