Search code examples
pandasmongodbinsertpymongo

Insert many based on upserting condition with Pymongo


I am trying to find an efficient way to upload a Pandas DataFrame to a MongoDB collection with the following constraints:

  • If the document is already existing, based on 2 unique document features (namely, 'business_id' and 'document_key'), overwrite

  • If the document does not exist, based on the same 2 unique document features (namely, 'business_id' and 'document_key'), create one

I tried with:

from pymongo import UpdateOne

upserts=[ 
    UpdateOne(
        {"$and": [
            {'business_id': x['business_id']},
            {"document_key": x["document_key"]}
             ]
             }, 
        {'$setOnInsert': x}, 
        upsert=True
        ) 
    for x in dd.to_dict("records")
    ]

result = collection.bulk_write(upserts)

But it does not seem to be updating the document, nor behaving to the overwriting/new document creations policies described above.

How can I perform the insertion in line with the 2 illustrated bullet points?


Solution

  • I suspect you want $set instead of $setOnInsert

    If an update operation with upsert: true results in an insert of a document, then $setOnInsert assigns the specified values to the fields in the document. If the update operation does not result in an insert, $setOnInsert does nothing.

    https://docs.mongodb.com/manual/reference/operator/update/setOnInsert/

    Worked example using $set:

    import pandas as pd
    from pymongo import MongoClient, UpdateOne
    
    db = MongoClient()['mydatabase']
    collection = db['mycollection']
    
    collection.insert_many([{'business_id': x, 'document_key': x, 'Existing': True} for x in range(10)])
    df = pd.DataFrame([{'business_id': x, 'document_key': x, 'Updated': True} for x in range(3, 6)])
    
    upserts = [
        UpdateOne(
            {'business_id': x['business_id'],
             "document_key": x["document_key"]},
            {'$set': x},
            upsert=True
        )
        for x in df.to_dict("records")
    ]
    
    result = collection.bulk_write(upserts)
    
    print(f'Matched: {result.matched_count}, Upserted: {result.upserted_count}, Modified: {result.modified_count}')
    
    for document in collection.find({}, {'_id': 0}):
        print(document)
    

    prints:

    Matched: 3, Upserted: 0, Modified: 3
    {'business_id': 0, 'document_key': 0, 'Existing': True}
    {'business_id': 1, 'document_key': 1, 'Existing': True}
    {'business_id': 2, 'document_key': 2, 'Existing': True}
    {'business_id': 3, 'document_key': 3, 'Existing': True, 'Updated': True}
    {'business_id': 4, 'document_key': 4, 'Existing': True, 'Updated': True}
    {'business_id': 5, 'document_key': 5, 'Existing': True, 'Updated': True}
    {'business_id': 6, 'document_key': 6, 'Existing': True}
    {'business_id': 7, 'document_key': 7, 'Existing': True}
    {'business_id': 8, 'document_key': 8, 'Existing': True}
    {'business_id': 9, 'document_key': 9, 'Existing': True}