I'm attempting to setup a MongoDB Sink Connector via Confluent Cloud which keeps data synced between pgsql and MongoDB.
I'm expecting the config below to update an existing document based on the id
(int) field (not _id - objectId), however it just creates a new document in MongoDB when consumed. Documents from pg will not contain the _id field, therefore we need the lookup done on our pgsql primary key (id).
Any ideas why this isn't working as I would expect?
{
"connector.class": "MongoDbAtlasSink",
"name": "mongodb-sink",
"kafka.api.key": "mykey",
"kafka.api.secret": "mysecret",
"input.data.format": "JSON",
"topics":"mytopic",
"connection.host": "myhost",
"connection.user": "myuser",
"connection.password": "mypassword",
"database": "mydatabase",
"delete.on.null.values": "false",
"tasks.max": "1",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list":"id",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"
}
key.projection.type
and value.projection.type
are apparently unsupported in the cloud version, so it won't work with that config.