Search code examples
mongodbapache-kafkaapache-kafka-connectconfluent-platform

How to get full document when using kafka mongodb source connector when tracking update operations on a collection?


I am using Kakfa MongoDB Source Connector [https://www.confluent.io/hub/mongodb/kafka-connect-mongodb] with confluent platform v5.4. Below is my MongoDB Source Connector config

{
    "name": "mongodb-replica-set-connector",
    "config": {
        "tasks.max": 1,
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.uri": "mongodb://<username:password>@<MongoDB-Server-IP-Or-DNS>/<DB-Name>?ssl=false&authSource=<DB-Name>&retryWrites=true&w=majority",
        "database": "<DB-Name>",
        "collection": "<Collection-Name>",
        "topic.prefix": ""
    }
}

I am getting full and correct document details when a record in inserted into the specified collection. But when I perform delete or update operation, I do not get the full document. Below is the screenshot for delete and update operation from a stream which reads the topic specified in the config. enter image description here My questions is - What should I specify in the config so I get full document when the update operation is performed? Is there any way to get the info like id or key for the document which was deleted?


Solution

  • Use the property publish.full.document.only": "true" in the MongoDB Connector config for getting the full document in case any create and update operation is done on the MongoDB collection. Delete operations cannot be tracked as it does not go with the idea of the CDC(change data capture) concept. Only the changes (create/update) in the data can be captured.