Search code examples
mongodbapache-kafkaapache-kafka-connectdebezium

Debezium with MongoDB - Produced record's payload contains backslash


I'm implementing data extract using the debezium mongodb connector, building up upon the official documentation: https://debezium.io/documentation/reference/stable/connectors/mongodb.html

Everything is working quite fine - except that the payload contains backslash as you can see in the after attribute. Well, oddly enough, the attribute source is right.

{
  "after": "{\"_id\": {\"$oid\": \"63626d5993801d8fd1140993\"},\"document\": \"29973569000204\",\"document_type\": \"CNPJ\"}",
  "patch": null,
  "filter": null,
  "source": {
    "version": "1.7.1.Final",
    "connector": "mongodb",
    "name": "xxxxxxxxxx",
    "ts_ms": 8466513,
    "snapshot": "false",
    "db": "database",
    "sequence": null,
    "rs": "atlas-iurhise-shard-0",
    "collection": "mongo_collection",
    "ord": 1,
    "h": null,
    "tord": 4,
    "stxnid": "281f4230-d8cc-3d23-a556-89923b45e25f:168"
  },
  "op": "c",
  "ts_ms": 1667394905422,
  "transaction": null
}

I tried this solution, but it doesn't work for me: Debezium Outbox Pattern property transforms.outbox.table.expand.json.payload not working

these are my settings:

{
  "name": "DebeziumDataExtract",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "3",
    "mongodb.hosts": "removed",
    "mongodb.name": "removed",
    "mongodb.user": "removed",
    "mongodb.password": "removed",
    "mongodb.ssl.enabled": "true",
    "collection.whitelist": "removed",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "hstore.handling.mode": "json",
    "decimal.handling.mode": "string",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "heartbeat.interval.ms": "1000",
    "heartbeat.topics.prefix": "removed",
    "topic.creation.default.replication.factor": 3,  
    "topic.creation.default.partitions": 1,  
    "topic.creation.default.cleanup.policy": "compact",  
    "topic.creation.default.compression.type": "lz4",
    "transforms": "unwrap",
    "transforms.unwrap.collection.expand.json.payload": "true"
  }
}

and waiting for a payload like this:

{
  "after": {
    "_id": {
      "$oid": "63626d5993801d8fd1140993"
    },
    "document": "29973585214796",
    "document_type": "CNPJ"
  },
  "patch": null,
  "filter": null,
  "source": {
    "version": "1.7.1.Final",
    "connector": "mongodb",
    "name": "xxxxxxxxxx",
    "ts_ms": 8466513,
    "snapshot": "false",
    "db": "database",
    "sequence": null,
    "rs": "atlas-iurhise-shard-0",
    "collection": "mongo_collection",
    "ord": 1,
    "h": null,
    "tord": 4,
    "stxnid": "281f4230-d8cc-3d23-a556-89923b45e25f:168"
  },
  "op": "c",
  "ts_ms": 1667394905422,
  "transaction": null
}

Could someone help me?

########## UPDATES ##########

After @onecricketeer comments I tried this:

{
  "name": "DebeziumTransportPlanner",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "3",
    "mongodb.hosts": "stg-transport-planner-0-shard-00-00-00.xmapa.mongodb.net,stg-transport-planner-0-shard-00-01.xmapa.mongodb.net,stg-transport-planner-0-shard-00-02.xmapa.mongodb.net",
    "mongodb.name": "stg-transport-planner-01",
    "mongodb.user": "oploguser-stg",
    "mongodb.password": "vCh1NtV4PoY8PeSJ",
    "mongodb.ssl.enabled": "true",
    "collection.whitelist": "stg-transport-planner-01[.]aggregated_transfers",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "hstore.handling.mode": "json",
    "decimal.handling.mode": "string",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "heartbeat.interval.ms": "1000",
    "heartbeat.topics.prefix": "__debeziumtransport-planner-heartbeat",
    "topic.creation.default.replication.factor": 3,  
    "topic.creation.default.partitions": 1,  
    "topic.creation.default.cleanup.policy": "compact",  
    "topic.creation.default.compression.type": "lz4",
    "transforms": "unwrap",
    "transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.collection.expand.json.payload": "true",
    "transforms.unwrap.collection.fields.additional.placement": "route_external_id:header,transfer_index:header"
  }
}

Solution

  • You need to use JsonConverter instead of StringConverter if you want the data to be a JSON object rather than a String.

    Also, you are missing transforms.unwrap.type