Search code examples
mongodbapache-kafka-connectmongodb-kafka-connector

Using mongo-kafka as sink connector, how do I map a topic record's value field to another value?


I'm new to both Kafka Connect and MongoDB. I have a record in a Kafka topic with a value of { "Id": "foo" } and I would like the Id to map to BAR when stored as a document in a collection in mongo. Expected result to be { "BAR": "foo" }. What should I try, or how do I configure to do this?

I'm using this as reference: https://github.com/mongodb/mongo-kafka/blob/master/docs/sink.md

I've tried adding "field.renamer.mapping": "[{\"oldName\":\"Id\", \"newName\": \"BAR\"}]" and "field.renamer.mapping": "[{\"oldName\":\"value.Id\", \"newName\": \"BAR\"}]"to the config similar to the mongo sink creation here: https://github.com/mongodb/mongo-kafka/blob/11bac7636f0d6b0e3313c84445777253d36c2042/docker/run.sh#L108 . The request goes through without errors, creates the record in mongodb, but it isn't mapping as expected.


Solution

  • You should be able to use a Single Message Transform (https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/), specifically the ReplaceField transform. You would add this to your connector config:

    "transforms": "RenameField",
    "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RenameField.renames": "Id:BAR"