Search code examples
apache-flinkdebeziumflink-sql

Flink SQL (V 1.12.1) unable to read debezium changelog from Kinesis stream


I'm having some issues with reading Debezium change log from a Kinesis stream. Could I please get some insight into how I can parse the change log events using Flink SQL.

Below is my attempt at trying to parse the stream via Flink SQL client

Flink SQL> CREATE TABLE test_table (
>   city_id INT,
>   country_id INT,
>   city STRING,
>   last_update timestamp
> )
> WITH (
>   'connector' = 'kinesis',
>   'stream' = 'kinesis.sakila.city',
>   'aws.region' = 'us-east-1',
>   'scan.stream.initpos' = 'TRIM_HORIZON',
>   'format' = 'debezium-json'
> );
[INFO] Table has been created.

Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema


Solution

  • There is a table in the Flink documentation that shows which connectors support each of the formats. You'll see there that the Debezium changelog format is not supported by the Kinesis connector.