Search code examples
apache-kafkaksqldb

Apache Kafka ksqlDB don't emit as time change


I'm new to ksqlDB so this could be something simple I'm not getting. Although I've read all the docs I could find and ain't got the answer - so I hope I can get some help in here!

I'm trying to make a message delivery queue in here which consists of messages that contain delivery time after which I'd like to EMITT message to response topic.

So it goes like this:

CREATE STREAM feedback_topic
WITH (
    KAFKA_TOPIC = 'feedback_topic',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 1,
    REPLICAS = 1
) AS SELECT time, data
FROM input_topic
WHERE time > UNIX_TIMESTAMP()
EMIT CHANGES;

I was hoping this query would get re-evaluated as UNIX_TIMESTAMP() changes but no luck in here.

I could understand as a reasonable trade off if authors of ksqlDB would choose to re-evaluate EMITT or not to EMITT as any row in input topics change but not when UNIX_TIMESTAMP() value change, nevertheless I'd like to have this functionality.

I can imagine making a timer topic that gets current time pushed in each second that triggers re-evaluation although, I'd be more than happy to find a built in way to do this.

Thanks in advance!


Solution

  • I was hoping this query would get re-evaluated as UNIX_TIMESTAMP() changes but no luck in here.

    Yeah, that's not how ksqlDB works. When the message is consumed from the source topic it's evaluated against the predicate at that point, and that point only.

    I guess you could create a "timer topic" like you say, and do a stream-stream join to trigger a reevaluation of the predicate, but this is not built into the product.

    You could log an issue to suggest this improvement.