Search code examples
apache-kafkaconfluent-platformksqldb

Why min function is not recognized by ksql


I'm using confluent to write a query to get the first timestamp in a 5 minute window of a kafka topic. Here's the query (I know it's not the pretty way to do it):

CREATE STREAM start_metric_value AS
select metric_value 
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
where metric_datetime_utc = MIN(TIMESTAMPTOSTRING(metric_datetime_utc, 'yyyy-MM-dd HH:mm:ss')) LIMIT 1;

but I have this error :

Code generation failed for Predicate: Can't find any functions with the name 'MIN'. expression:(METRIC_DATETIME_UTC = MIN(TIMESTAMPTOSTRING(METRIC_DATETIME_UTC, 'yyyy-MM-dd HH:mm:ss'))), schema:ROWKEY STRING KEY, ID STRING, METRIC_NAME STRING, METRIC_VALUE STRING, METRIC_DATETIME_UTC BIGINT, METRIC_INDEX STRING, IANA_TIMEZONE STRING, PROCESSED_DATETIME_UTC BIGINT, DATA_TYPE STRING, ASSET_TYPE STRING, ROWTIME BIGINT, ROWKEY STRING Caused by: Can't find any functions with the name 'MIN'

can any one know how to solve this problem


Solution

  • Not 100% clear about what you're trying to achieve. See comment above on your question about adding more details to help people understand what you're trying to achieve.

    That said, I can say....

    The Min function is not being recognised for two reasons:

    • You're passing the output of TIMESTAMPTOSTRING to MIN, but MIN does not take a string.
    • You can't use a aggregate function in a WHERE clause.

    The error message you're seeing looks like a bug. If it still exists on the latest version of ksqlDB you may want to raise an issue in the ksqlDB GitHub project.

    Even correcting these two things you're query will still fail as windowing in ksqlDB requires an aggregation, so you'll need a GROUP BY.

    If, for example, you wanted to capture the min metric_datetime_utc per metric_value for each 5 minute window, you could do so with:

    CREATE TABLE start_metric_value AS
      SELECT
        metric_value,
        MIN(metric_datetime_utc) as minTs
      FROM dataaggregaion 
      WINDOW TUMBLING (SIZE 5 MINUTE)
      GROUP BY metric_value;
    

    This will create a windowed table, i.e. a table where the key is made up of metric_value and the WINDOWSTART time. minTs will store the minimum datetime seen.

    Let's run some data through the query to understand what's happening:

    Input:

    rowtime | metric_value  | metric_datetime_utc
    --------|---------------|--------------------
     1      |  A            | 3
     2      |  A            | 4
     3      |  A            | 2
     4      |  B            | 5
     300000 |  A            | 6
    

    Output to the START_METRIC_VALUE topic might be (Note: metric_Value and windowStart will be stored in the Kafka record's key, while minTs will be in the value):

    metric_value | windowStart | minTs 
    -------------|-------------|------
     A           | 0           | 3
     A           | 0           | 3
     A           | 0           | 2
     B           | 0           | 5
     A           | 300000      | 6
    

    What is actually output to the topic will depend on your value of cache.max.bytes.buffering. Setting this to 0, turning off buffering, will see the above output. However, with buffering enabled some of the intermediate results may not be output to Kafka, though the final result for each window will remain the same. You can also control what is output to Kafka using the upcoming SUPPRESS functionality

    The above solution gives you the min timestamp per metric_value. If you want a global minimum datetime seen per window, then you can GROUP BY a constant. Note, this routes all events to a single ksqlDB node, so it doesn't scale well as a solution. If scaling is an issue there are solutions, e.g. like first calculating the minimum metric_value and then post-processing this to find the global minimum.

    CREATE TABLE start_metric_value AS
      SELECT
        1 as Key,
        MIN(metric_datetime_utc) as minTs
      FROM dataaggregaion 
      WINDOW TUMBLING (SIZE 5 MINUTE)
      GROUP BY 1;
    

    Note: syntax is correct for version 0.10 of ksqlDB. You may need to adjust for other versions.