Search code examples
apache-flinkflink-streaming

Flink interprets a mapped Row as a single RAW


Im able to sink a static Row to a database:

DataStream<Row> staticRows = environment.fromElements("value1", "value2")
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment); // convert to table API
Table inputTable = tableEnv.fromDataStream(staticRows);
tableEnv.executeSql(myDDLAndSinkProperties);
inputTable.executeInsert("MYTABLE");

But mapping an unbounded stream to a Row like this:

DataStream<Row> kafkaRows = kafkaEvents.map(new MyKafkaRecordToRowMapper());

Throws an error where input and sink schemas do not match when DB insert is attempted. Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]

The same code works for a POJO and Tuple, but I have more than 25 columns and the POJO doesn't serve any other purpose - so Im hoping it could replaced by a general purpose sequence of fields (which Row claims to be). How do I use Row to input into a database? Given examples only show it used for static datastreams and database output.


Solution

  • I think this will work better if you change it to something like this (after adjusting the column names and types, of course):

    DataStream<Row> kafkaRows = kafkaEvents
      .map(new MyKafkaRecordToRowMapper())
      .returns(Types.ROW_NAMED(
                    new String[] {"id", "quota", "ts", ...},
                    Types.STRING,
                    Types.LONG,
                    TypeInformation.of(Instant.class)),
                    ...);