Search code examples
apache-kafkaapache-kafka-streams

Is it possible to query against an aggregated store in the same topology in Kafka Streams?


I have a case where I have to query each stream value against a store that is generated from these same values. I tried doing something like this

final KStream<String, CustomDto> stream = builder.stream(INPUT_TOPIC_NAME);
final KTable<Windowed<CustomKey>, Long> table = stream
    .map((key, value) -> mapFunction(value)) // here we are getting our CustomKey and Long value
    .groupByKey(Grouped.with(new CustomKeySerde(), Serdes.Long()))
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(300)))
    .aggregate(
        () -> 0L,
        (aggKey, newValue, aggValue) -> aggValue + newValue,
        Materialized.as(STORE_NAME).with(new CustomKeySerde(), Serdes.Long()));

I need to be able to query the resulting store for each CustomDto object that is passed through the input topic. I tried doing it inside the mapFunction, but it seems that the store is unavailable from there even if I add it with builder.addStateStore beforehand. How do I even approach this? I read the docs, but I haven't found anything that would help my case.


Solution

  • You will need to use the Processor API in order to access state stores.

    https://kafka.apache.org/33/documentation/streams/developer-guide/processor-api.html