Search code examples
apache-kafka-streamswindowed

How to send record on topic when window is closed in kafka streams


So i have been struggeling with this for a couple of days, acctually. I am consuming records from 4 topics. I need to aggregate the records over a TimedWindow. When the time is up, i want to send either an approved message or a not approved message to a sink topic. Is this possible to do with kafka streams?

It seems it sinks every record to the new topic, even though the window is still open, and that's really not what i want.

Here is the simple code:

 builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(), 
 Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String, 
 FooTriggerMessage>("", Serdes.String(),
       fooTriggerSerde))
 .filter((key, value) -> value.getTriggerEventId() != null)
 .groupBy((key, value) -> value.getTriggerEventId().toString(),
       Serialized.with(Serdes.String(), fooTriggerSerde))

.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))

.aggregate(() -> new BarApprovalMessage(), /* initializer */
       (key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
       Materialized
               .<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
                       storeName) /* state store name */
               .withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(), 
Produced.with(windowedSerde, barApprovalSerde));

As of now, every record is being sinked to the outgoingTopic, i only want it to send one message when the window is closed, so to speak.

Is this possible?


Solution

  • You can use the Suppress functionality.

    From Kafka official guide:

    enter image description here https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results