Search code examples
apache-kafkaapache-kafka-streams

Kafka Streams conditional suppress


I have a stateless topology that consumes an entity from a topic, let's call it Input, analyse it and come up with a result, let's call it Status (ACTIVE, HALTED, DELETED), and produce it to another topic.

Now I have a requirement to not produce the result immediately in case the Status is HALTED, because another Input might come in and change it back to ACTIVE, and the consumers do not care about these short transitions, so I should wait some minutes and only send the new Status if it did not change back to ACTIVE.

I am wondering if it's possible to implement this grouping by key, then using a reduce operation that simply gets the latest Status, combined with a custom suppress similar to Suppressed.untilTimeLimit, but that only suppress results with status HALTED.

I tried to do it using Suppressed interface but it looks like it's a marker interface, and actually SuppressedInternal must be used, which exposes a TimeDefinition that is package private, so I can't hack it. Is there a workaround?


Solution

  • I think your best bet here is to go with the Kafka Streams Processor API or use KStream.process method (which mixes the Processor API into the KStream DSL) with a state store. That way, you have complete control over what you put into the store and when you want to emit a record.