Search code examples
mongodbapache-kafka-connectmongodb-kafka-connector

How do I increment a field with UpdateOneTimestamps writemodel strategy


I want to continuously update a document's field named value {'_id': 'count', 'value':0} by a certain number.

My MongoSinkConnector has

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

I'm using a python script to produce messages to the appropiate topic

        self._aio_producer.produce(
            topic='mongo',
            value=json.dumps(
                {
                    "_id":"count",
                    "$inc":{"value":len(task['payload'].split(','))}
                 }
            )
        )

But I get this error on the Kafka Connect standalone process:

Failed to put into the sink the following records: [SinkRecord{kafkaOffset=8174, timestampType=CreateTime} ConnectRecord{topic='mongo', kafkaPartition=1, key=null, keySchema=null, value={_id=count, $inc={value=1}}, valueSchema=null, timestamp=1697153679938, headers=ConnectHeaders(headers=)}] 
(com.mongodb.kafka.connect.sink.MongoSinkTask:244) 
com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=52, message=The dollar ($) prefixed field '$inc' in '$inc' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., details={} 

I've tried dropping the $inc part, but it seems to just be replacing the document over and over without incrementing the value. Is there any way to increment a value or do I have to write my own custom Class?


Solution

  • Currently the way is to create your own Custom WriteModel Strategy.

        BsonDocument setOnInsertFields =
            new BsonDocument().append(A_FIELD_NAME, aValue).append(B_FIELD_NAME, bValue);
    
        BsonDocument incFields =
            new BsonDocument()
                .append(C_COUNT_FIELD_NAME, cType)
                .append(D_COUNT_FIELD_NAME, DType);
    
        // Create new document with specific update operations
        // such as set or setOnInsert, inc, etc
        BsonDocument newDocument =
            new BsonDocument().append("$setOnInsert", setOnInsertFields).append("$inc", incFields);