Search code examples
mongodbapache-kafkaapache-kafka-connectmongodb-kafka-connector

In Kafka, the "Key" does not match the "Id" when updating the document in MongoDB


We are trying to take all the records from MongoDB to Kafka using the com.mongodb.kafka.connect.MongoSourceConnector. The settings are used for connector as follows:

{
    "name": "mongo-source",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "connection.uri": "mongodb:***:27017/?authSource=admin&replicaSet=myMongoCluster&authMechanism=SCRAM-SHA-256",
        "database": "someDb",
        "collection": "someCollection",
        "output.format.value":"json",
        "output.format.key":"json",
        "key.converter.schemas.enable":"false",
        "value.converter.schemas.enable":"false",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "publish.full.document.only": "true",
        "change.stream.full.document":"updateLookup",
        "copy.existing": "true"
    }
}

When all documents are initially uploaded from MongoDB to Kafka, the "key" corresponds to the "id" from Mongo document:

{"_id": {"_id": {"$oid": "5e54fd0fbb5b5a7d35737232"}, "copyingData": true}}

But when a document in MongoDB is updated, an update with a different "key" gets into Kafka:

{"_id": {"_data": "82627B2EF6000000022B022C0100296E5A1004A47945EC361D42A083988C14D982069C46645F696400645F0FED2B3A35686E505F5ECA0004"}}

Thus, the consumer cannot identify the initially uploaded document and update for it.

Please help me find which settings on the Kafka, Connector or MongoDB side are responsible for this and how I can change the "Key" in Kafka to the same as during the initial upload.


Solution

  • We were facing the same issue and after some search we started using the following config. We defined a avro schema to extract the output schema key. The key is generated consistently and looks like

    Struct{fullDocument._id=e2ce4bfe-d03a-4192-830d-895df5a4b095}

    Here "e2ce4bfe-d03a-4192-830d-895df5a4b095" is the document id.

    {
      "change.stream.full.document" : "updateLookup",
      "connection.uri" : "<connection_uri>",
      "connector.class" : "com.mongodb.kafka.connect.MongoSourceConnector",
      "collection": "someCollection",
      "copy.existing" : "true",
      "database" : "someDb",
      "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable" : "false",
      "key.serializer" : "org.apache.kafka.connect.storage.StringConverter",
      "name" : "mongo-source",
      "output.format.key" : "schema",
      "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson"
      "publish.full.document.only" : "true",
      "output.schema.key": "{\"type\":\"record\",\"name\":\"keySchema\",\"fields\":[{\"name\":\"fullDocument._id\",\"type\":\"string\"}]}",      
      "tasks.max" : "1",
      "value.converter" : "org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable" : "false"
    }