Search code examples
apache-kafkaapache-kafka-connectdebezium

Can Debezium MySQL Connector route data change event to different topic by event's operation type?


There are three types of data change event that Debezium support:

  • insert
  • delete
  • update

I've known that there is an op field in Debezium published message's payload that identify the event type, but i wonder that is there anyway that i can route these three types of data change event to different Kafka topic by the operation type, like SMT?


Solution

  • Single Message Transform

    As you suggest, Single Message Transform is a good option to use here. Debezium have a transform currently in beta called ContentBasedRouter with which you can code the routing using languages including Groovy.

    ksqlDB

    You can do this with ksqlDB:

    -- Declare source topic from Debezium as ksqlDB stream
    CREATE STREAM ORDERS WITH (KAFKA_TOPIC='asgard.demo.ORDERS', VALUE_FORMAT='AVRO');
    
    -- Create three streams (backed by Kafka topics) based on the op-type
    CREATE STREAM ORDERS_UPDATES AS SELECT * FROM ORDERS WHERE OP='u';
    CREATE STREAM ORDERS_DELETES AS SELECT * FROM ORDERS WHERE OP='d';
    CREATE STREAM ORDERS_CREATES AS SELECT * FROM ORDERS WHERE OP='c';
    

    View the data

    ksql> SHOW TOPICS;
    
     Kafka Topic                           | Partitions | Partition Replicas
    -------------------------------------------------------------------------
     ORDERS_CREATES                        | 1          | 1
     ORDERS_DELETES                        | 1          | 1
     ORDERS_UPDATES                        | 1          | 1
    

    Check the counts

    ksql> SELECT OP,COUNT(*) AS EVENTS FROM ORDERS GROUP BY OP EMIT CHANGES;
    +-------+----------+
    |OP     |EVENTS    |
    +-------+----------+
    |u      |3         |
    |c      |502       |
    |d      |5         |
    
    ksql> SELECT 'ORDERS_UPDATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
            FROM ORDERS_UPDATES GROUP BY 'ORDERS_UPDATES' EMIT CHANGES LIMIT 1 ;
    +----------------+-------------+
    |TOPIC_NAME      |EVENT_COUNT  |
    +----------------+-------------+
    |ORDERS_UPDATES  |3            |
    Limit Reached
    Query terminated
    
    ksql> SELECT 'ORDERS_CREATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
            FROM ORDERS_CREATES GROUP BY 'ORDERS_CREATES' EMIT CHANGES LIMIT 1 ;
    +----------------+-------------+
    |TOPIC_NAME      |EVENT_COUNT  |
    +----------------+-------------+
    |ORDERS_CREATES  |503          |
    Limit Reached
    Query terminated
    
    ksql> SELECT 'ORDERS_DELETES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
            FROM ORDERS_DELETES GROUP BY 'ORDERS_DELETES' EMIT CHANGES LIMIT 1 ;
    +----------------+-------------+
    |TOPIC_NAME      |EVENT_COUNT  |
    +----------------+-------------+
    |ORDERS_DELETES  |5            |
    Limit Reached
    Query terminated