Search code examples
apache-flinkflink-streamingflink-cepflink-sql

How to attach schema to a Flink DataStream - on the fly?


I am dealing with a stream of database mutations, i.e., a change log stream. I want to able to transform the values using a SQL query. I am having difficulty putting together the following three concepts RowTypeInfo, Row, and DataStream.

NOTE: I don't know the schema beforehand. I construct it on-fly using the data within the Mutation object (Mutation is a custom type)

More specifically I have code that looks like this.

val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)

// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")

NOTE: Row object is positional, and doesn't have a placeholder for column names. I couldn't find a place to attach the schema to the DataStream object.

I want to pass some sort of a struct similar to Row that contains the complete information {columnName: String, columnValue: Object, columnType: TypeInformation[_]} for the query.


Solution

  • In Flink SQL a table schema is mandatory when the Table defined. It is not possible to run queries on dynamically typed records.

    Regarding the concepts of RowTypeInfo, Row and DataStream:

    • Row is the actual record that holds the data
    • RowTypeInfo is a schema description for Rows. It contains names and TypeInformation for each field of a Row.
    • DataStream is a logical stream of records. A DataStream[Row] is a stream of rows. Note that this is not the actual stream but just an API concept to represent a stream in the API.