Search code examples
apache-kafkaapache-kafka-connectchange-data-capturedebezium

How to capture data in mysql with debezium change data capture and consume with jdbc sink in kafka connect?


I have problem of capturing data in mysql with debezium change data capture and consuming it to another mysql using kafka connect jdbc sink.

Because the schema and payload that debezium produces to kafka topic is not compatible with the schema that kafka connect jdbc sink expects.

I get exception when jdbc sink wants to consume data and create records in another mysql.

How should I solve this problem ?


Solution

  • The message structure produced by Debezium is indeed different than that expected by the JDBC sink. The JDBC sink expects each field in the message to correspond to a field in the row, and therefore the message corresponds to the "after" state of the row. OTOH, the Debezium MySQL connector performs Change Data Capture, which means it does more than just including the latest state of the row. Specifically, the connector outputs messages with a key containing the row's primary or unique key columns, and a message value containing an envelope structure with:

    • the operation, such as whether it is an insert, update, or delete
    • the state of the row before the change occurred (null on inserts)
    • the state of the row after the change occurred (null on deletes)
    • source-specific information including server metadata, the transaction ID, database and table names, server timestamp when the event occurred, and details about where the event was found, etc.
    • timestamp at which the connector generated the event

    The simplest way to solve this discrepancy is to use Kafka 0.10.2.x (currently the latest release is 0.10.2.1) and Kafka Connect's new Single Message Transforms (SMTs). Each Kafka Connect connector can be configured with chains of zero or more SMTs that can transform the output of source connectors before the messages are written to Kafka, or transform the messages read from Kafka before they are passed as input to sink connectors. SMTs are intentionally very simple, deal with a single message, and definitely should not access external resources or maintain any state, and therefore not a replacement for Kafka Streams or other stream processing systems that are far more powerful, can join multiple input streams, and can perform very complex operations and maintain state across multiple messages.

    If you're using Kafka Streams to do any kind of processing, then you should consider manipulating the messages structure in your Kafka Streams application. If not, then SMTs are a great way to solve your problem. In fact, there are two ways to use SMTs to adjust the message structure.

    The first option is to use an SMT with the Debezium connector to extract/retain the "after" state of the row and discard all the other information before it is written to Kafka. Of course, you'd be storing less information in the Kafka topics and throwing away some of the CDC information that may be valuable in the future.

    The second and IMO preferred option is to leave the source connector as-is and to keep all of the CDC messages in the Kafka topics, but to then use an SMT with the sink connector to extract/retain the "after" state of the row and discard all the other information before the message is passed to the JDBC sink connector. You may be able to use one of the existing SMTs included in Kafka Connect, but you may consider writing your own SMT to do exactly what you want.