Search code examples
apache-flinkflink-sql

Apache Flink: Cannot resolve field in select clause


I'm beginning to try Apache Flink and I'm trying to aggregate some values ingested from a kafka topic. Here's the code I'm using:

public class EnvironmentMeasuresJob {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        final TableEnvironment tEnv = TableEnvironment.create(settings);
        tEnv.executeSql("CREATE TABLE EnvironmentMeasures (" +
                "`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'," +
                "`area` STRING," +
                "`sensor` STRING," +
                "`co` DECIMAL(5, 2)," +
                "`pm1` DECIMAL(5, 2)," +
                "`pm25` DECIMAL(5, 2)," +
                "`pm10` DECIMAL(5, 2)," +
                "`temperature` DECIMAL(5, 2)," +
                "`pressure` DECIMAL(5, 2)," +
                "`humidity` DECIMAL(5, 2)," +
                "WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND" +
                ") WITH (" +
                "'connector' = 'kafka'," +
                "'topic' = 'seneca.environmentmeasures.raw'," +
                "'properties.bootstrap.servers' = 'localhost:9092'," +
                "'properties.group.id' = 'env-measures-job'," +
                "'scan.startup.mode' = 'earliest-offset'," +
                "'format' = 'json'," +
                "'json.fail-on-missing-field' = 'false'," +
                "'json.ignore-parse-errors' = 'false'" +
                ")");
        Table environmentMeasures = tEnv.from("EnvironmentMeasures");
        Table aggregatedEnvironmentMeasures = environmentMeasures
                .window(Slide.over(lit(20).seconds())
                        .every(lit(10).seconds())
                        .on($("timestamp"))
                        .as("w"))
                .groupBy($("sensor"), $("w"))
                .select(
                        $("w").end().as("timestamp"),
                        $("area"),
                        $("sensor"),
                        $("co").avg().as("averageCO"),
                        $("pm1").avg().as("averagePM1"),
                        $("pm25").avg().as("averagePM25"),
                        $("pm10").avg().as("averagePM10"),
                        $("temperature").avg().as("averageTemperature"),
                        $("pressure").avg().as("averagePressure"),
                        $("humidity").avg().as("averageHumidity")
                );
    }
}

But when I try to execute the code I get the following exception:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [area], input field list:[sensor, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7, EXPR$0]. 

If I remove "area" from the select everything works. Any idea on how this happens? Am I missing something? Thanks euks


Solution

  • I think you need to either group by the area or compute some aggregation over the areas included in the given sensor and window.