Search code examples
javaapache-kafkaapache-kafka-streams

How to implement Kafka Streams topology that process single topic with interactive queries store and global store


I am trying to implement Kafka Streams that is going to treat single topic stream as global database with interactive queries possible. So I want to have:

  1. global store for records (GlobalKTable, KeyValueStore)

  2. queryable store, that allows me to get result of an interactive query (maximum)

Interactive query has to calculate the global maximum of one of record's field:

 KStream<String, TercUnitRecord> recordsStream = topologyBuilder.stream(topicName);
 KTable<String, Long> lastUpdateStore = recordsStream.mapValues(record -> record.getLastUpdate())
                .selectKey((key, value) -> "lastdate")
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                .reduce((maxValue, currValue) -> maxValue.compareTo(currValue) == 1 ? maxValue : currValue,
 Materialized.as("terc-lastupdate"));

However, I am facing problem that I cannot use the same single topic as source in one Kafka Streams instance. I have done a reasearch and the only way I found to do this is by multiple KafkaStreams instances, but I am not sure it is the correct and only way to achieve this. Any ideas?


Solution

  • I used multiple KafkaStreams instances for each task and it worked properly.