Search code examples
mongodbapache-kafkaapache-kafka-connectdebeziumcdc

How to do type conversion when transferring data from MongoDB to kafka with debezium?


In Mongodb, the objectid is base64. I'm streaming these docs to Kafka using Debezium. How can I get ObjectId to be written as UUID in kafka?

Mongo Example Doc :

{
    "_id" : BinData(3,"8D/JiwMtkEKSfrfKsxUe+g=="),
    "Version" : 5,
    "CreatedAt" : ISODate("2021-09-22T00:24:43.939+03:00"),
    "UpdatedAt" : ISODate("2021-09-22T00:32:53.096+03:00"),
    "AbidikId" : BinData(3,"CVebG2sIf0OtxnUNZIl39g=="),
    "GubidikId" : BinData(3,"U06d2Rk4nUG7Fz3iASM9LQ=="),
    "IsActive" : true,
    "BrandList" : [ "Sony2", "SUNY2" ],
    "CategoryIdList" : [ ]
}

Kafka Example Message :

{
    "_id": "8D/JiwMtkEKSfrfKsxUe+g==",
    "Version": 5,
    "CreatedAt": 1632259483939,
    "UpdatedAt": 1632259973096,
    "AbidikId": "CVebG2sIf0OtxnUNZIl39g==",
    "GubidikId": "U06d2Rk4nUG7Fz3iASM9LQ==",
    "IsActive": true,
    "BrandList": [
       "Sony2",
       "SUNY2"
    ],
    "CategoryIdList": []
}

What I expect kafka message ?

"_id" : "8bc93ff0-2d03-4290-927e-b7cab3151efa",
"AbidikId": "1b9b5709-086b-437f-adc6-750d648977f6",
"GubidikId": "d99d4e53-3819-419d-bb17-3de201233d2d"

I couldn't find a solution in Debezium. I can't change Ids in MongoDB. Thank you in advance.


Solution

  • We solved this problem by custom smt.

    1. First of all we created java project from this repo : https://github.com/confluentinc/kafka-connect-insert-uuid

    2. Customize your data in this project like Convert or edit. Also we can get the fields we want from the debezium config.

    3. Export jar via Maven.

    4. Do docker image from that jar.

    5. Set your image and field in your debezium config :

      "transforms": "unwrap,Reroute,convertguid,insertKey", "transforms.convertguid.type":"com.example.kafka.connect.smt.Base64ToCsuuid$Value",

      "transforms.convertguid.csuuid.field.names":"_id,examplefield1,examplefield2",

    6. Recreate debezium connector.

    Finally we can convert mongo id to UUID while streaming to kafka. I hope it helps someone who needs it.