Search code examples
javaapache-flinkflink-sql

Flink SQL : Use changelog stream to update rows in Dynamic Table


I have a stream that contains JSON messages that look like this :

{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}

This stream is handled in a DataStream<Row> that is registered as a TableSource.

I want to use this stream as a changelog stream to update the content of a Flink Table, but I can't find a way to do that.

I have defined a StreamTableSource as :

public class MyTableSource implements StreamTableSource<Row>, ... {

    @Override
    public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
        DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream 
                .keyBy([SOME KEY])                  // Aggregate by key 
                .map(new MyMapFunction());          // Map the update message with the correct encoding ?

        return stream;
    }

    ... 
}

And this TableSource is used in

public void process(final StreamExecutionEnvironment env) {
    final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    tableEnv.registerTableSource("MyTableSource", new MyTableSource());

    Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.

    result.insertInto([SOME SINK]);
}

What is the good way to do this ? (And more specificaly, how can I use a stream to delete rows from a Table ?)


Solution

  • Currently, the internal changelog processing capabilities are not exposed through the API. So there is no source available that allows you to interpret an incoming changelog as a table. This is planned for Flink 1.11.

    Until then, you could think about using a user-defined aggregate function that applies the updates as proposed here:

    Apache Flink: How to enable "upsert mode" for dynamic tables?