I'm having some issues with reading Debezium change log from a Kinesis stream. Could I please get some insight into how I can parse the change log events using Flink SQL.
Below is my attempt at trying to parse the stream via Flink SQL client
Flink SQL> CREATE TABLE test_table (
> city_id INT,
> country_id INT,
> city STRING,
> last_update timestamp
> )
> WITH (
> 'connector' = 'kinesis',
> 'stream' = 'kinesis.sakila.city',
> 'aws.region' = 'us-east-1',
> 'scan.stream.initpos' = 'TRIM_HORIZON',
> 'format' = 'debezium-json'
> );
[INFO] Table has been created.
Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema
There is a table in the Flink documentation that shows which connectors support each of the formats. You'll see there that the Debezium changelog format is not supported by the Kinesis connector.