I am dealing with a stream of database mutations, i.e., a change log stream. I want to able to transform the values using a SQL query.
I am having difficulty putting together the following three concepts
RowTypeInfo
, Row
, and DataStream
.
NOTE: I don't know the schema beforehand. I construct it on-fly using the data within the Mutation
object (Mutation
is a custom type)
More specifically I have code that looks like this.
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)
// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")
NOTE: Row
object is positional, and doesn't have a placeholder for column names.
I couldn't find a place to attach the schema to the DataStream
object.
I want to pass some sort of a struct similar to Row
that contains the complete information {columnName: String, columnValue: Object, columnType: TypeInformation[_]}
for the query.
In Flink SQL a table schema is mandatory when the Table
defined. It is not possible to run queries on dynamically typed records.
Regarding the concepts of RowTypeInfo
, Row
and DataStream
:
Row
is the actual record that holds the dataRowTypeInfo
is a schema description for Row
s. It contains names and TypeInformation
for each field of a Row
.DataStream
is a logical stream of records. A DataStream[Row]
is a stream of rows. Note that this is not the actual stream but just an API concept to represent a stream in the API.