Search code examples
apache-flinkflink-sql

Flink 1.4 Column 'rowtime' not found in any table


I am following the documentation to configure a TableSource with a rowtime attribute.

I register the timestamp field as follows

KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic
            .forTopic("alerting")
            // set Kafka consumer properties
            .withKafkaProperties(getKafkaProperties())
            // set Table schema
            .withSchema(TableSchema.builder()
                    .field("tenant", Types.STRING())
                    .field("message", Types.STRING())
                    .field("frequency", Types.LONG())
                    .field("timestamp", Types.SQL_TIMESTAMP()).build())
            .failOnMissingField(true)
            .withRowtimeAttribute(
                    // "timestamp" is rowtime attribute
                    "timestamp",
                    // value of "timestamp" is extracted from existing field with same name
                    new ExistingField("timestamp"),
                    // values of "timestamp" are at most out-of-order by 30 seconds
                    new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1)))
            .build();

    //register the alerting topic as kafka
    tEnv.registerTableSource("kafka", source);

    Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " +
            "FROM kafka " +
            "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");

    tEnv.toAppendStream(results, Row.class).print();

and get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)


Solution

  • The field in your table kafka is called timestamp and not rowtime. So you should use call the attribute by its name timestamp instead of rowtime.

    Note that TIMESTAMP is a keyword in SQL, so you should either rename the timestamp attribute or escape the attribute name with backticks (`):

    tEnv.sqlQuery(
      "SELECT tenant, message, SUM(frequency) " +
      "FROM kafka " +
      "GROUP BY HOP(`timestamp`, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");
    

    Btw. BoundedOutOfOrderTimestamps of one day is actually quite a lot. This can cause significant processing latency and state sizes, as the query will collect data for one day before it starts emitting results and discarding state.