Search code examples
apache-flink

How to get array element from array column


I am using Flink 1.12 and I have following simple code to demonstrate the usage of array type column.

I want to get the second element in the favorites array column, but when I run the run the application, following exception throws:

Type is not supported: ANY
org.apache.flink.table.api.TableException: Type is not supported: ANY
    at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:576)
    at org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType(FlinkTypeFactory.scala)
    at org.apache.flink.table.planner.operations.PlannerQueryOperation.lambda$new$0(PlannerQueryOperation.java:55)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Collections$2.tryAdvance(Collections.java:4717)

The application code:

  test("test java po schema") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)
    val ds: DataStream[(String, Timestamp, Double, Seq[String])] = env.fromElements(("123", new Timestamp(new Date().getTime), 12.4, Seq("x", "y", "z")))
    val tenv = StreamTableEnvironment.create(env)
    val table = tenv.fromDataStream(ds, $"id", $"trade_date", $"price", $"favorites")

    table.printSchema()

    tenv.createTemporaryView("xxx", table)

    //I want to query the second element in the favorites array column
    tenv.sqlQuery("select favorites[1] from xxx").toAppendStream[Row].print()


    env.execute()

  }

Solution

  • This should work if the column is actually an Array -- it can't be a Java List or a Scala Seq.