Search code examples
apache-kafkaapache-kafka-streams

Is it possible to get top 10 from ktable\kstream?


I have a topic with a String key which is a signal type and Signal value which is a class like this

public clas Signal {
  public final int deviceId;
  public final int value;
  ...
}

Each device can send signal values which raise or fall with time without a pattern.

Is it possible to get top 10 devices with max signal value at all period of time by each type (key of the topic) as a KTable<String,Signal>? Would it helped if all signal values were raising?

Topic structure can be changed if needed.


Solution

  • It is possible to do with Kafka Streams for the case when values are always raising, for example. It is needed to create own Top10 aggregate, which stores top 10 and updates it on add call:

        final var builder = new StreamsBuilder();
        final var topTable = builder
            .table(
                SignalChange.TOPIC_NAME,
                Consumed.with(Serdes.String(), new SignalChange.Serde())
            ).toStream()
            .groupByKey()
            .aggregate(
                () -> new Top10(),
                (k, v, top10) -> top10.add(v),
                Materialized.with(Serdes.String(), new Top10.Serde())
            );
    

    topTable can then be joined with any stream requesting for the top.