Search code examples
javaapache-flinkflink-streamingflink-sql

Adding a column in Flink table


I'm trying to add a new column to a flink table in Java

Table table = tEnv.sqlQuery(query.getQuery());
table = table.addColumns($("NewColumn"));

but I'm running into this ValidationException:

org.apache.flink.table.api.ValidationException: Cannot resolve field [NewColumn], input field list:[ExistingColumn1, ExistingColumn2, ...].

I saw a similar example in the flink docs, so I'm not sure what I'm doing wrong here.

I tried running the code in the debugger and it seems to be failing in the resolve method


Solution

  • you are trying to add an existing column "NewColumn" as a new column to table! As this column does not exit yet it says, Cannot resolve field [NewColumn], input field list. Take a look at example from documents. Here the column "c" already exists in the table.

    Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc"));
    

    You should give an expression. and then can use .as() for column name.

    Table addColumns(Expression... fields);