I am using Flink 1.12 and I have following simple code to demonstrate the usage of array type column.
I want to get the second element in the favorites array column, but when I run the run the application, following exception throws:
Type is not supported: ANY
org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:576)
at org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType(FlinkTypeFactory.scala)
at org.apache.flink.table.planner.operations.PlannerQueryOperation.lambda$new$0(PlannerQueryOperation.java:55)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Collections$2.tryAdvance(Collections.java:4717)
The application code:
test("test java po schema") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(1)
val ds: DataStream[(String, Timestamp, Double, Seq[String])] = env.fromElements(("123", new Timestamp(new Date().getTime), 12.4, Seq("x", "y", "z")))
val tenv = StreamTableEnvironment.create(env)
val table = tenv.fromDataStream(ds, $"id", $"trade_date", $"price", $"favorites")
table.printSchema()
tenv.createTemporaryView("xxx", table)
//I want to query the second element in the favorites array column
tenv.sqlQuery("select favorites[1] from xxx").toAppendStream[Row].print()
env.execute()
}
This should work if the column is actually an Array -- it can't be a Java List or a Scala Seq.