Im able to sink a static Row to a database:
DataStream<Row> staticRows = environment.fromElements("value1", "value2")
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment); // convert to table API
Table inputTable = tableEnv.fromDataStream(staticRows);
tableEnv.executeSql(myDDLAndSinkProperties);
inputTable.executeInsert("MYTABLE");
But mapping an unbounded stream to a Row like this:
DataStream<Row> kafkaRows = kafkaEvents.map(new MyKafkaRecordToRowMapper());
Throws an error where input and sink schemas do not match when DB insert is attempted. Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
The same code works for a POJO and Tuple, but I have more than 25 columns and the POJO doesn't serve any other purpose - so Im hoping it could replaced by a general purpose sequence of fields (which Row claims to be). How do I use Row to input into a database? Given examples only show it used for static datastreams and database output.
I think this will work better if you change it to something like this (after adjusting the column names and types, of course):
DataStream<Row> kafkaRows = kafkaEvents
.map(new MyKafkaRecordToRowMapper())
.returns(Types.ROW_NAMED(
new String[] {"id", "quota", "ts", ...},
Types.STRING,
Types.LONG,
TypeInformation.of(Instant.class)),
...);