Search code examples
apache-kafkaapache-kafka-streams

Kafka streams Top N words in word count


I have implemented the well known example of counting words using kafka streams:

KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));

KTable<String, Long> wordCounts = textLines    
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, value) -> value)    
    .count();

Now I would like to export the top N words with the highest count to a new topic. What would be the best way to do that?


Solution

  • You could filter all counts above some treshold.

    textLines.toStream().filter((key, value) -> value > N).to("new-topic")
    

    But this, will only return values "more than N", not N values.


    Or you can query the statestore using Interactive Queries, find the total number of entries in the store, then grab the "top N" (after sorting the data)

    This is what is done in the KafkaMusicExample to find "top 5 songs played" in a simulated record player.


    Feel free to checkout the other Confluent examples repo where they have some examples of TopN implementations using a PriorityQueue, such as this one.