Search code examples
node.jsmongodbmongodb-query

mongodb update only if the desired document is last, otherwise insert


I'm implementing a notifications system, where I have a notifications collection with the following schema:

{
    user_id: Number,
    message: String,
    count: Number,
    timestamp: Date
}

Whenever there are 2 or more consecutive notifications for the same user with the same message, I want to prevent storing all of them. Instead, I just want to increment count by 1.

Here is the node.js code that handles inserting new notifications with the approach described above:

async function createNotification(user_id, message) {
    const notifications = db.collection('notifications');
    const [lastNotification] = await notifications.find({ user_id })
        .sort({ timestamp: -1 }).limit(1).toArray();

    if(lastNotifcation?.message === message) {
        await notifications.updateOne(
            { _id: lastNotifcation._id },
            { count: lastNotifcation.count+1 },
        )
    } else {
        await notifications.insertOne({ user_id, message, count: 1, timestamp: new Date() });
    }
}

While the code above works, it is not atomic (it isn't a single operation). How can I achieve the logic above with one operation instead of two?


Solution

  • While I believe wrapping your existing work in transactions should be the option that involves minimal change, you can opt for leveraging the aggregation pipeline for all the work.

    1. $match to check for existing data
    2. $sort + $limit: 1 to get last record
    3. $set + $add to increment counter
    4. $setUnion with your default data for "insert template"
    5. $setWindowFields to compute $rank for the user_id partition
      • sortBy timestamp to compute rank
      • if there is existing data from step 2, 3 it will have rank: 1, since its timestamp will be smaller than $$NOW current timestamp of "insert template"
      • if there is no existing data, the "insert template" will have rank: 1
    6. $match to pick up only our rank: 1 data and $unset to clean up the rank field
    7. $merge to upsert back to the collection
    db.notifications.aggregate([
      {
        "$match": {
          // your input user_id and message to upsert here
          "user_id": 1,
          "message": "update case - count should be incremented"
        }
      },
      {
        "$sort": {
          "timestamp": -1
        }
      },
      {
        "$limit": 1
      },
      {
        "$set": {
          "count": {
            "$add": [
              "$count",
              1
            ]
          }
        }
      },
      {
        "$unionWith": {
          "coll": "notifications",
          "pipeline": [
            {
              "$documents": [
                // document to insert if not found
                {
                  "user_id": 1,
                  "message": "msg to insert",
                  "count": 1,
                  "timestamp": "$$NOW"
                }
              ]
            }
          ]
        }
      },
      {
        "$setWindowFields": {
          "partitionBy": "$user_id",
          "sortBy": {
            "timestamp": 1
          },
          "output": {
            "rank": {
              "$rank": {}
            }
          }
        }
      },
      {
        "$match": {
          "rank": 1
        }
      },
      {
        "$unset": "rank"
      },
      {
        "$merge": {
          "into": "notifications",
          "on": [
            "user_id",
            "message"
          ]
        }
      }
    ])
    

    Mongo Playground for matched / update count case
    Mongo Playground for unmatched / insert case
    Mongo Playground for no record / insert case