Search code examples
scalaapache-flinkflink-streamingflink-sqlapache-iceberg

Scala Option Types not recognized in apache flink table api


I am working on building a Flink application which reads data from Kafka topics, apply some transformations and writes to the Iceberg table.

I read the data from Kafka topic (which is in JSON) and use circe to decode that to Scala case class with Scala Option values in it. All the transformations on the datastream works fine.

Case Class Looks like below

Event(app_name: Option[String], service_name: Option[String], ......)

But when I try to convert the stream to a table to write to iceberg table due to the case classes the columns are converted to Raw type as shown below.

table.printSchema()

service_name RAW('scala.Option', '...'),
conversion_id RAW('scala.Option', '...'),
......

And the table write fails as below.

Query schema: [app_name: RAW('scala.Option', '...'), .........
Sink schema: [app_name: STRING, .......

Does the Flink table API support Scala case classes with option values? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#special-types

I found out that it is supported in datastream at this documentation.

Is there a way to do this in Table API?


Solution

  • The type system of the Table API is more restrictive than the one of the DataStream API. Unsupported classes are immediately treated as black-boxed type RAW. This allows objects to still pass the API but it might not be supported by every connector.

    From the exception, it looks like you declared the sink table with app_name: STRING, so I guess you are fine with a string representation. If this is the case, I would recommend to implement a user-defined function that performs the conversion to string.