Search code examples
mongodbsubdocument

How to upsert a subdocument atomically?


I have a document structure for storing application related data for each user device. The company has some number of applications available, which is limited and not get changed so often. So I designed the document using embedded subdocument array approach to reduce seek and make it suitable for aggregation pipeline.

Let one document defined:

{
    _id: "device_unique_identification_code",
    device_model: "iPhone5,2",
    applications: [
        {
            app_id: "a_game",
            push_token: "edbca9078d6c0c3a9f17166bbcf5be440c8ed2c6",
            last_user_id: 132522
        },
        {
            app_id: "an_app",
            push_token: "fed949982ceac84279f22a29bdd66cc13b7750e1",
            last_user_id: 132522
        },
        {
            app_id: "yet_another_game",
            push_token: "5cbf5a2bf0db7d6d55bd454222844d37d4f351b6",
            last_user_id: 842452
        },
        {
            app_id: "yet_another_app",
            push_token: "d1b60db7d54246d55bd37f4f35d45c2284b5a2bf",
            last_user_id: 842452
        }
    ]
}

This collection only stores device-app specific data, all session/user related is kept in another collection.

Since those applications are quite busy, I must use atomic command to do anything to reduce the risk of race condition.

Here is the question.

Given device "a" and application "b", store some device-app value (e.g., save a push_token) in a single atomic command.

Here are the test cases.

  • If there is no document of device "a", create one with device-app data.
  • If the document of device "a" already exists, but there is no app "b". Push a new device-app.
  • If the document of device "a" already exists and app "b" already exists. Update the existing one.

I wasted a few days trying various queries with upsert/addToSet/setOnInsert/etc. but still no clue.

PS. There are 2 more options I have thought about.

  • Use separated collections, this will work but with the trade of seek performance and I am feeling like it's a kind of RDBMS.
  • Use app_id as a map key to subdocument instead of array, this also works but I may loose the ability of aggregation pipeline and fallback to the (slower) map-reduce.

Answer Described

The solution is using optimistic locking with version field.

previousVersion = 0

while (true) {
    // find target document with current version number,
    // also create a new document with version 1 initially
    // also find whether there is an existing app_id
    // so we don't have to loop through the array
    doc = db.devices.findAndModify({
        query:{_id:"given_device_id"},
        update:{$setOnInsert:{version:1}},
        fields:{version:1,applications:{$elemMatch:{app_id:"given_app_id"}}},
        upsert:true,
        new:true})

    // prevent unexpected infinite loop
    if (previousVersion == doc['version']) {
        throw new InfiniteLoopExpectedException()
    }

    previousVersion = doc['version']

    if (doc contains applications) {
        // if document contains the target application
        // update it using $ positioning because I am too lazy to find the index
        result = db.devices.update(
            {
                _id:"given_device_id",
                version:doc['version'],
                "applications.app_id":"given_app_id"
            }, 
            {
                $inc:{version:1},
                $set:{"applications.$.push_token":"given_value"}
            })
    } else {
        // no app_id found ? simply push
        result = db.devices.update(
            {_id:"given_device_id",version:doc['version']},
            {
                $inc:{version:1},
                $push:{applications:{app_id:"given_app_id",push_token:"given_value"}}
            })
    }

    // if the update command failed, retry the process again
    if (result['nModified'] == 1) {
        break
    }
}

Solution

  • Did you try find and modify?

    If you add a field version you can ensure that an update proceeds only against the version of the record that you have from a query (version). If it doesn't go through you can re-read the document (gets version n+m), increment the version (n+m+1), reapply your changes and retry findandmodify matching the version number you just read (n+m). Eventually you will succeed and when you do you know that no other thread or process intervened between your read and your write.