I am trying to take a Flink Table and convert it into a retracting sink which then gets wired into a sink. I was able to do this in the original table planner using a CRow
, but it doesn't seem like Flink's Blink planner support the CRow
anymore. Is there a way to accomplish this while using the Blink planner?
For reference, we were able to do this before by mapping the retracting stream to a CRow
type before wiring it into the RetractStreamTableSink
.
Below is a unit test example of what I am trying to accomplish, note the commented out code block works correctly in the old planner.
This fails with the following exception, which makes sense, given the retracting stream is of type Tuple2<Boolean, Row>
and the Sink is of type Row
, but I don't see a way to use a Tuple2
retracting DataStream
with a RetractStreamTableSink<Row>
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [f0: STRING, f1: STRING]
@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);
Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "2");
SingleOutputStreamOperator<Row> source =
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);
tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select key, id from table1");
RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
// This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator<?> tuple2DataStream = tableEnvironment
// .toRetractStream(outputTable, rowTypeInfo)
// .map(value -> new CRow(value.f1, value.f0))
// .returns(new CRowTypeInfo(rowTypeInfo));
// Create the retracting stream
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a sink
TableSchema schema = new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
CollectingTableSink collectingTableSink = new CollectingTableSink(schema);
RetractSink retractTableSink = new RetractSink(collectingTableSink);
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the table and the sink (this is what fails)
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
System.out.println(collectingTableSink.rows);
}
So I found a workaround for this problem, if you make a shim interface AppendStreamTableSink<Tuple2<Boolean, Row>>
and have it implement the methods that RetractStreamTableSink
defaults, plus override the consumeDataStream
method as shown below, you can go back from Tuple2 to Row without needing CRow
.
This is exactly what the RetractStreamTableSink
is meant for, but something causes Blink to fail when using it (even in the case where both the AppendStreamTableSink
and RetractStreamTableSink
are identical (all methods overridden and equal, only difference is the name of the interface you implement). I strongly suspect this is a bug in the Blink planner, but have been unable to identify where it is coming from.
Code snippet that does the conversion:
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<Row> filteredAndMapped =
dataStream.filter(x -> x.f0).map(x -> x.f1).returns(delegate.getOutputType());
return delegate.consumeDataStream(filteredAndMapped);
}