I am build an data synchronizer, which capture the data change from MySQL Source, and export the data to hive.
I choose to use Kafka Connect to implement this. I use Debezium as source connector, and confluent hdfs as sink connector.
Debezium provides a single message transformation to let me extract the after
field from the complex event message. I do the same configuration as the document listed, but it didn't work.
{
// omit ...
"transform": "unwrap",
"transform.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
I've tried to configure the transform both at the source connector side and sink connector side, it still cannot work. In fact, when i configure it at my source connector side, then check the message in the corresponding topic, i found that the messages still contain all fields including before
, source
, etc.
ythh@openstack2:~/confluent-5.5.0$ bin/kafka-avro-console-consumer --from-beginning --bootstrap-server localhost:9092 --topic dbserver1.test_data_1.student3
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":1,"name":"ggg"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005572000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":9474,"row":0,"thread":{"long":6013},"query":null},"op":"c","ts_ms":{"long":1589005572172},"transaction":null}
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":2,"name":"no way"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005893000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":11218,"row":0,"thread":{"long":6030},"query":null},"op":"c","ts_ms":{"long":1589005893773},"transaction":null}
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":3,"name":"not work"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005900000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":11501,"row":0,"thread":{"long":6030},"query":null},"op":"c","ts_ms":{"long":1589005900724},"transaction":null}
I've also check the kafka connect log, here's some of the output:
ythh@openstack2:~/kafka_2.12-2.5.0/logs$ cat connect.log | grep transform
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
[2020-05-09 14:29:30,470] INFO transform.unwrap.type = io.debezium.transforms.ExtractNewRecordState (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,470] INFO transform = unwrap (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,471] INFO transform.unwrap.drop.tombstones = false (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,471] INFO transform.unwrap.delete.handling.mode = rewrite (io.debezium.connector.common.BaseSourceTask:97)
transforms = []
transforms = []
[2020-05-09 14:29:32,419] INFO transform.unwrap.type = io.debezium.transforms.ExtractNewRecordState (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:32,419] INFO transform = unwrap (io.debezium.connector.common.BaseSourceTask:97)
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
It looks like you made a typo (transform
instead of transforms
). Try this configuration:
{
// omit ...
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}