Search code examples
scalaapache-flinkflink-streamingflink-sql

Flink Table API & SQL and map types (Scala)


I am using Flink's Table API and/or Flink's SQL support (Flink 1.3.1, Scala 2.11) in a streaming environment. I'm starting with a DataStream[Person], and Person is a case class that looks like:

Person(name: String, age: Int, attributes: Map[String, String])

All is working as expected until I start to bring attributes into the picture.

For example:

val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)

... leads to:

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job$.main(Job.scala:112) at com.nordstrom.mdt.Job.main(Job.scala)

Note: this error occurs whether or not the specific map key is present. Also note that if I do not specify a map key at all, I get a different error which makes sense; that scenario is not at play here.

This PR seems to say that there's a path forward: https://github.com/apache/flink/pull/3767. Looking specifically at the test case, it suggests that type information is possible with DataSets. None of the relevant methods fromDataStream and registerDataStream offer a way to provide type information.

Is this possible? In other words, can Flink SQL on Streams support maps?

Clarifying edit... When omitting the map key (GROUP BY ... attributes rather than attributes['foo']), I get the error below. This indicates that the runtime does know that these are strings.

This type (interface scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)]) cannot be used as key.


Solution

  • Currently, Flink SQL supports only Java java.util.Map. Scala maps are treated as a blackbox with Flink GenericTypeInfo/SQL ANY data type. Therefore, you can forward these blackboxes and use them within scalar functions but accessing with the ['key'] operator is not supported.

    So either you use a Java map or you implement the access operation yourself in a UDF.

    I created an issue for your problem: https://issues.apache.org/jira/browse/FLINK-7360