I'm implementing data extract using the debezium mongodb connector, building up upon the official documentation: https://debezium.io/documentation/reference/stable/connectors/mongodb.html
Everything is working quite fine - except that the payload contains backslash as you can see in the after
attribute. Well, oddly enough, the attribute source
is right.
{
"after": "{\"_id\": {\"$oid\": \"63626d5993801d8fd1140993\"},\"document\": \"29973569000204\",\"document_type\": \"CNPJ\"}",
"patch": null,
"filter": null,
"source": {
"version": "1.7.1.Final",
"connector": "mongodb",
"name": "xxxxxxxxxx",
"ts_ms": 8466513,
"snapshot": "false",
"db": "database",
"sequence": null,
"rs": "atlas-iurhise-shard-0",
"collection": "mongo_collection",
"ord": 1,
"h": null,
"tord": 4,
"stxnid": "281f4230-d8cc-3d23-a556-89923b45e25f:168"
},
"op": "c",
"ts_ms": 1667394905422,
"transaction": null
}
I tried this solution, but it doesn't work for me: Debezium Outbox Pattern property transforms.outbox.table.expand.json.payload not working
these are my settings:
{
"name": "DebeziumDataExtract",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "3",
"mongodb.hosts": "removed",
"mongodb.name": "removed",
"mongodb.user": "removed",
"mongodb.password": "removed",
"mongodb.ssl.enabled": "true",
"collection.whitelist": "removed",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"hstore.handling.mode": "json",
"decimal.handling.mode": "string",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"heartbeat.interval.ms": "1000",
"heartbeat.topics.prefix": "removed",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"transforms": "unwrap",
"transforms.unwrap.collection.expand.json.payload": "true"
}
}
and waiting for a payload like this:
{
"after": {
"_id": {
"$oid": "63626d5993801d8fd1140993"
},
"document": "29973585214796",
"document_type": "CNPJ"
},
"patch": null,
"filter": null,
"source": {
"version": "1.7.1.Final",
"connector": "mongodb",
"name": "xxxxxxxxxx",
"ts_ms": 8466513,
"snapshot": "false",
"db": "database",
"sequence": null,
"rs": "atlas-iurhise-shard-0",
"collection": "mongo_collection",
"ord": 1,
"h": null,
"tord": 4,
"stxnid": "281f4230-d8cc-3d23-a556-89923b45e25f:168"
},
"op": "c",
"ts_ms": 1667394905422,
"transaction": null
}
Could someone help me?
########## UPDATES ##########
After @onecricketeer comments I tried this:
{
"name": "DebeziumTransportPlanner",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "3",
"mongodb.hosts": "stg-transport-planner-0-shard-00-00-00.xmapa.mongodb.net,stg-transport-planner-0-shard-00-01.xmapa.mongodb.net,stg-transport-planner-0-shard-00-02.xmapa.mongodb.net",
"mongodb.name": "stg-transport-planner-01",
"mongodb.user": "oploguser-stg",
"mongodb.password": "vCh1NtV4PoY8PeSJ",
"mongodb.ssl.enabled": "true",
"collection.whitelist": "stg-transport-planner-01[.]aggregated_transfers",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"hstore.handling.mode": "json",
"decimal.handling.mode": "string",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"heartbeat.interval.ms": "1000",
"heartbeat.topics.prefix": "__debeziumtransport-planner-heartbeat",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"transforms": "unwrap",
"transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.collection.expand.json.payload": "true",
"transforms.unwrap.collection.fields.additional.placement": "route_external_id:header,transfer_index:header"
}
}
You need to use JsonConverter
instead of StringConverter
if you want the data to be a JSON object rather than a String.
Also, you are missing transforms.unwrap.type