I'm implementing a notifications system, where I have a notifications
collection with the following schema:
{
user_id: Number,
message: String,
count: Number,
timestamp: Date
}
Whenever there are 2 or more consecutive notifications for the same user with the same message
, I want to prevent storing all of them. Instead, I just want to increment count
by 1.
Here is the node.js code that handles inserting new notifications with the approach described above:
async function createNotification(user_id, message) {
const notifications = db.collection('notifications');
const [lastNotification] = await notifications.find({ user_id })
.sort({ timestamp: -1 }).limit(1).toArray();
if(lastNotifcation?.message === message) {
await notifications.updateOne(
{ _id: lastNotifcation._id },
{ count: lastNotifcation.count+1 },
)
} else {
await notifications.insertOne({ user_id, message, count: 1, timestamp: new Date() });
}
}
While the code above works, it is not atomic (it isn't a single operation). How can I achieve the logic above with one operation instead of two?
While I believe wrapping your existing work in transactions should be the option that involves minimal change, you can opt for leveraging the aggregation pipeline for all the work.
$match
to check for existing data$sort
+ $limit: 1
to get last record$set
+ $add
to increment counter$setUnion
with your default data for "insert template"$setWindowFields
to compute $rank
for the user_id
partition
sortBy
timestamp
to compute rank
$$NOW
current timestamp of "insert template"$match
to pick up only our rank: 1
data and $unset
to clean up the rank
field$merge
to upsert back to the collectiondb.notifications.aggregate([
{
"$match": {
// your input user_id and message to upsert here
"user_id": 1,
"message": "update case - count should be incremented"
}
},
{
"$sort": {
"timestamp": -1
}
},
{
"$limit": 1
},
{
"$set": {
"count": {
"$add": [
"$count",
1
]
}
}
},
{
"$unionWith": {
"coll": "notifications",
"pipeline": [
{
"$documents": [
// document to insert if not found
{
"user_id": 1,
"message": "msg to insert",
"count": 1,
"timestamp": "$$NOW"
}
]
}
]
}
},
{
"$setWindowFields": {
"partitionBy": "$user_id",
"sortBy": {
"timestamp": 1
},
"output": {
"rank": {
"$rank": {}
}
}
}
},
{
"$match": {
"rank": 1
}
},
{
"$unset": "rank"
},
{
"$merge": {
"into": "notifications",
"on": [
"user_id",
"message"
]
}
}
])
Mongo Playground for matched / update count case
Mongo Playground for unmatched / insert case
Mongo Playground for no record / insert case