Search code examples
arraysmongodbmongodb-querymongoskin

Mongodb check for duplicate records for bulk insertions


My collection has records like:

{ "_id":"1", "field1":"foo","field2":"xyz", "field3":"something" ...}
{ "_id":"2", "field1":"bar","field2":"xyz", "field3":"something" ...}
{ "_id":"3", "field1":"foo","field2":"abc", "field3":"something" ...}
{ "_id":"4", "field1":"bar","field2":"lmn", "field3":"something" ...}

Before inserting a new record i need to check if a record with same field1 and field2 value already exists. And then discard the request, if it already exists. I can manage to do this if i was inserting one record at a time. How do i handle this if im doing a bulk insert (i.e. when im inserting array of documents)?

I have array of [field1, field2] combinations which i need to look up EX:

queryArray=[ { "field1":"foo","field2":"xyz"},
             { "field1":"bar","field2":"lmn"} ]

Expected result:

result=[  { "_id":"1", "field1":"foo","field2":"xyz", "field3":"something" ...},
          { "_id":"4", "field1":"bar","field2":"lmn", "field3":"something" ...}]

Solution

  • Create a unique compound index on both fields

    db.collection.createIndex( { "field1": 1, "field2": 1 }, { "unique": true } )
    

    Use the insertMany() method to do the bulk insert but set the ordered option to false as this will ensure that all write operations are attempted, even if there are errors. Ordered operations stop after an error, while unordered operations continue to process any remaining write operations in the queue:

    var queryArray = [ 
        { "field1": "foo", "field2": "xyz" },
        { "field1": "bar", "field2": "lmn" }
    ];
    try { db.collection.insertMany(queryArray, { "ordered": false }); } 
    catch (e) {  print (e); }
    

    This will output a document

    {
        "acknowledged" : true,
        "insertedIds" : [ 
            ObjectId("57443e6fa58e5654f3a6c5ae"), 
            ObjectId("57443e6fa58e5654f3a6c5af")
        ]
    }
    

    The resulting document shows the fields acknowledged as true if the operation ran with write concern or false if write concern was disabled and an array of _id for each successfully inserted documents.

    Because the documents in queryArray did not include _id, mongod creates and adds the _id field for each document and assigns it a unique ObjectId value. And since you enforced uniqueness on the two fields field1 and field2, the above shows the attempted write as the operation was unordered thus it continued to process any remaining write operations.


    Suppose you had removed the ordered option (by default it's set to true), you would then get the following output from the operation:

    var queryArray = [ 
        { "field1": "foo", "field2": "xyz" },
        { "field1": "bar", "field2": "lmn" }
    ];
    try { db.collection.insertMany(queryArray); } 
    catch (e) {  print (e); }
    

    Console Output:

    {
        "name" : "BulkWriteError",
        "message" : "write error at item 0 in bulk operation",
        "ok" : undefined,
        "nInserted" : 0,
        "nUpserted" : 0,
        "nMatched" : 0,
        "nModified" : 0,
        "nRemoved" : 0,
        "getUpsertedIds" : function () {
          return bulkResult.upserted;
        },
        "getUpsertedIdAt" : function (index) {
          return bulkResult.upserted[index];
        },
        "getRawResponse" : function () {
          return bulkResult;
        },
        "hasWriteErrors" : function () {
          return bulkResult.writeErrors.length > 0;
        },
        "getWriteErrorCount" : function () {
          return bulkResult.writeErrors.length;
        },
        "getWriteErrorAt" : function (index) {
          if(index < bulkResult.writeErrors.length) {
            return bulkResult.writeErrors[index];
          }
          return null;
        },
        "getWriteErrors" : function () {
          return bulkResult.writeErrors;
        },
        "hasWriteConcernError" : function () {
          return bulkResult.writeConcernErrors.length > 0;
        },
        "getWriteConcernError" : function () {
          if(bulkResult.writeConcernErrors.length == 0) {
            return null;
          } else if(bulkResult.writeConcernErrors.length == 1) {
            // Return the error
            return bulkResult.writeConcernErrors[0];
          } else {
    
            // Combine the errors
            var errmsg = "";
            for(var i = 0; i < bulkResult.writeConcernErrors.length; i++) {
              var err = bulkResult.writeConcernErrors[i];
              errmsg = errmsg + err.errmsg;
              // TODO: Something better
              if (i != bulkResult.writeConcernErrors.length - 1) {
                errmsg = errmsg + " and ";
              }
            }
    
            return new WriteConcernError({ errmsg : errmsg, code : WRITE_CONCERN_FAILED });
          }
        },
        "tojson" : function (indent, nolint) {
          return tojson(bulkResult, indent, nolint);
        },
        "toString" : function () {
          return "BulkWriteError(" + this.tojson() + ")";
        },
        "shellPrint" : function () {
          return this.toString();
        },
        "hasErrors" : function () {
          return this.hasWriteErrors() || this.hasWriteConcernError();
        },
        "toSingleResult" : function () {
          if(singleBatchType == null) throw Error(
              "Cannot output single WriteResult from multiple batch result");
          return new WriteResult(bulkResult, singleBatchType, writeConcern);
        },
        "stack" : "BulkWriteError({\n\t\"writeErrors\" : [\n\t\t{\n\t\t\t\"index\" : 0,\n\t\t\t\"code\" : 11000,\n\t\t\t\"errmsg\" : \"E11000 duplicate key error index: test.collection.$field1_1_field2_1 dup key: { : \\\"foo\\\", : \\\"xyz\\\" }\",\n\t\t\t\"op\" : {\n\t\t\t\t\"_id\" : ObjectId(\"574441aea58e5654f3a6c5b6\"),\n\t\t\t\t\"field1\" : \"foo\",\n\t\t\t\t\"field2\" : \"xyz\"\n\t\t\t}\n\t\t}\n\t],\n\t\"writeConcernErrors\" : [ ],\n\t\"nInserted\" : 0,\n\t\"nUpserted\" : 0,\n\t\"nMatched\" : 0,\n\t\"nModified\" : 0,\n\t\"nRemoved\" : 0,\n\t\"upserted\" : [ ]\n})\nBulkWriteError@src/mongo/shell/bulk_api.js:372:44\nBulkWriteResult/this.toError@src/mongo/shell/bulk_api.js:335:16\nBulk/this.execute@src/mongo/shell/bulk_api.js:1162:1\nDBCollection.prototype.insertMany@src/mongo/shell/crud_api.js:279:5\n@(shell):1:7\n",
        "toResult" : function () {
          return new BulkWriteResult(bulkResult, singleBatchType, writeConcern);
        }
    }
    

    With emphasis on the returned write error:

    "E11000 duplicate key error index: test.collection.$field1_1_field2_1 dup key: { : \\\"foo\\\", : \\\"xyz\\\" }\"
    

    Apart from the insertMany() method, you could also try the Bulk() API methods where in particular you need to call the initializeUnorderedBulkOp() method to do an unordered bulk insert after creating the unique compound index.

    Consider the following example for the above case:

    db.collection('collectionName', function(err, collection) {
        var bulk = collection.initializeUnorderedBulkOp();
        counter = 0;
    
        queryArray.forEach(function (doc){
            bulk.insert(doc);
            counter++;
    
            if (counter % 1000 == 0) {
                bulk.execute(function(err, result) {
                    // you could do something with results, check for duplicate errors
                    bulk = collection.initializeUnorderedBulkOp(); // re-initialise
                }); 
            }
        });
    
        // Clean-up remaining operations in the queue 
        if (counter % 1000 != 0 ) {     
            bulk.execute(function(err, result) {
                // you could do something with results, check for duplicate errors
                console.log(result);
            });
        }
    });