Search code examples
apache-kafkakafka-consumer-apiapache-kafka-streamsktable

Can I use topic more than once in Kafka Stream Topology?


Let's just hypothetically say groupby function was not available in kafka streams. Can I do the below to get the word count and build a KTable on top of it? Please note that i use "word-count-topic" twice in the topology. I have a use case where I want to build something iteratively and for the next stream event, I want to look up previous value and update it based on the event. I want to keep the latest value in the same topic on which I build Ktable.

KTable<String,Long> wordCountTable = builder.table("word-count-topic",Consumed.with(Serdes.String(), Serdes.Long()));

KStream<String,String> wordsStream = builder.stream("words-topic",Consumed.with(Serdes.String(), Serdes.String()));

KStream<String,String> msgStream = wordsStream
                                   .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                                   .selectKey((k,v) -> v);

msgStream.leftJoin(kTable, (word,count) -> {
                                             if( count == null) return new WordCount(word, Long.valueOf(1));
                                             else return new WordCount(word, count + 1);
                                           })
            .mapValues((k,v)-> v.getCount())
            .to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

streams = new KafkaStreams(builder.build(), props);
streams.start();

Solution

  • That should work. Why not just run the code?