I have a case where I have to query each stream value against a store that is generated from these same values. I tried doing something like this
final KStream<String, CustomDto> stream = builder.stream(INPUT_TOPIC_NAME);
final KTable<Windowed<CustomKey>, Long> table = stream
.map((key, value) -> mapFunction(value)) // here we are getting our CustomKey and Long value
.groupByKey(Grouped.with(new CustomKeySerde(), Serdes.Long()))
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(300)))
.aggregate(
() -> 0L,
(aggKey, newValue, aggValue) -> aggValue + newValue,
Materialized.as(STORE_NAME).with(new CustomKeySerde(), Serdes.Long()));
I need to be able to query the resulting store for each CustomDto object that is passed through the input topic. I tried doing it inside the mapFunction, but it seems that the store is unavailable from there even if I add it with builder.addStateStore
beforehand.
How do I even approach this? I read the docs, but I haven't found anything that would help my case.
You will need to use the Processor API in order to access state stores.
https://kafka.apache.org/33/documentation/streams/developer-guide/processor-api.html