I'm using confluent to write a query to get the first timestamp in a 5 minute window of a kafka topic. Here's the query (I know it's not the pretty way to do it):
CREATE STREAM start_metric_value AS
select metric_value
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
where metric_datetime_utc = MIN(TIMESTAMPTOSTRING(metric_datetime_utc, 'yyyy-MM-dd HH:mm:ss')) LIMIT 1;
but I have this error :
Code generation failed for Predicate: Can't find any functions with the name 'MIN'. expression:(METRIC_DATETIME_UTC = MIN(TIMESTAMPTOSTRING(METRIC_DATETIME_UTC, 'yyyy-MM-dd HH:mm:ss'))), schema:
ROWKEY
STRING KEY,ID
STRING,METRIC_NAME
STRING,METRIC_VALUE
STRING,METRIC_DATETIME_UTC
BIGINT,METRIC_INDEX
STRING,IANA_TIMEZONE
STRING,PROCESSED_DATETIME_UTC
BIGINT,DATA_TYPE
STRING,ASSET_TYPE
STRING,ROWTIME
BIGINT,ROWKEY
STRING Caused by: Can't find any functions with the name 'MIN'
can any one know how to solve this problem
Not 100% clear about what you're trying to achieve. See comment above on your question about adding more details to help people understand what you're trying to achieve.
That said, I can say....
The Min
function is not being recognised for two reasons:
TIMESTAMPTOSTRING
to MIN
, but MIN
does not take a string.WHERE
clause.The error message you're seeing looks like a bug. If it still exists on the latest version of ksqlDB you may want to raise an issue in the ksqlDB GitHub project.
Even correcting these two things you're query will still fail as windowing in ksqlDB requires an aggregation, so you'll need a GROUP BY
.
If, for example, you wanted to capture the min metric_datetime_utc
per metric_value
for each 5 minute window, you could do so with:
CREATE TABLE start_metric_value AS
SELECT
metric_value,
MIN(metric_datetime_utc) as minTs
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY metric_value;
This will create a windowed table, i.e. a table where the key is made up of metric_value
and the WINDOWSTART
time. minTs
will store the minimum datetime seen.
Let's run some data through the query to understand what's happening:
Input:
rowtime | metric_value | metric_datetime_utc
--------|---------------|--------------------
1 | A | 3
2 | A | 4
3 | A | 2
4 | B | 5
300000 | A | 6
Output to the START_METRIC_VALUE
topic might be (Note: metric_Value and windowStart will be stored in the Kafka record's key, while minTs will be in the value):
metric_value | windowStart | minTs
-------------|-------------|------
A | 0 | 3
A | 0 | 3
A | 0 | 2
B | 0 | 5
A | 300000 | 6
What is actually output to the topic will depend on your value of cache.max.bytes.buffering
. Setting this to 0
, turning off buffering, will see the above output. However, with buffering enabled some of the intermediate results may not be output to Kafka, though the final result for each window will remain the same. You can also control what is output to Kafka using the upcoming SUPPRESS functionality
The above solution gives you the min timestamp per metric_value. If you want a global minimum datetime seen per window, then you can GROUP BY
a constant. Note, this routes all events to a single ksqlDB node, so it doesn't scale well as a solution. If scaling is an issue there are solutions, e.g. like first calculating the minimum metric_value
and then post-processing this to find the global minimum.
CREATE TABLE start_metric_value AS
SELECT
1 as Key,
MIN(metric_datetime_utc) as minTs
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY 1;
Note: syntax is correct for version 0.10 of ksqlDB. You may need to adjust for other versions.