Search code examples
mongodbmongodb-querynosqlaggregation-frameworknosql-aggregation

Removing blocked users from the pipeline with lookup in mongodb


On Mongodb, I have a "conversations2" collection where I keep chat data and a "user_blocked" collection that I keep blocked by the system. While pulling the chats according to userId, I need to pull the IDs in user_blocked that do not match otherUserId. My query below works, but it is too slow for users with a high number of messages (for example, there are 450 thousand records for a user with 32 IDs). Is there a chance I can speed this up or receive this query via a different route? (My goal is to get the user's unread message count, excluding any blocking) Thanks in advance for your help. By the way, userId_1 and otherUserId_1 indexes in conversations2 are added. The id_1 index is added in user_blocked.

db.conversations2.aggregate([
  {
    $match: {
      userId: 32
    }
  },
  {
    $lookup: {
      from: "user_blocked",
      localField: "otherUserId",
      foreignField: "id",
      as: "blockedUsers"
    }
  },
  {
    $match: {
      blockedUsers: {
        $eq: []
      }
    }
  },
  {
    $group: {
      _id: "$userId",
      unreadMessageCount: {
        $sum: "$unreadMessageCount"
      }
    }
  }
])

conversations2 collection example data ;

{
  "_id": {
    "$oid": "65c0f64030054c4b8f0481a0"
  },
  "otherUserId": {
    "$numberLong": "45"
  },
  "userId": {
    "$numberLong": "32"
  },
  "lastMessage": "test",
  "lastMessageTime": {
    "$date": "2024-02-21T10:36:44.592Z"
  },
  "lastMessageType": 1,
  "lastMessageWay": "in",
  "unreadMessageCount": 29
}

user_blocked example data;

{
  "_id": {
    "$oid": "66033f989bba279fe7d0862a"
  },
  "id": {
    "$numberLong": "45"
  }
}

Solution

  • 1. Only check for unread > 0

    Since you're getting only unreadMessageCount in this query, the first minor optimisation is to add that to your first $match stage. Since the total is unchanged when it's 0, whether the user is blocked or not.

    {
      $match: {
        userId: 32,
        unreadMessageCount: { $gt: 0 }
      }
    }
    

    2. Uncorrelated Subquery with $lookup

    2A.

    We can also try to optimise the number of $lookups which actually occur. By using an Uncorrelated Subquery with $lookup - ie it only runs once and not for every document.

    db.conversations2.aggregate([
      {
        $match: {
          userId: 32,
          unreadMessageCount: { $gt: 0 }
        }
      },
      {
        // this will only run once
        $lookup: {
          from: "user_blocked",
          pipeline: [
            {
              $group: {
                _id: null,
                ids: { $addToSet: "$id" }
              }
            }
          ],
          as: "blockedUsers"
        }
      },
      {
        $set: {
          blockedUsers: { $first: "$blockedUsers.ids" }
        }
      },
      {
        $match: {
          $expr: {
            $eq: [
              { $indexOfArray: ["$blockedUsers", "$otherUserId"] },
              -1
            ]
          }
        }
      },
      {
        $group: {
          _id: "$userId",
          unreadMessageCount: {
            $sum: "$unreadMessageCount"
          }
        }
      }
    ])
    

    Mongo Playground with the correct result of unread 10

    However, if your user_blocked collection is too big, you may hit the 16MB per-stage limit (1.3 mil ids for Ints, ~600k for Longs), followed by the 100MB limit with allowDiskUse: true. In that case use variation B below:

    2B.

    This aggregation pipeline will reduce the total number of lookups needed and has less chance of running into stage-size limits but it still might. Steps:

    • Get all the otherIDs into a single array
    • Do a one-time lookup on user_blocked
    • Retain only the remaining validOtherIDs
    • Do a self-lookup of conversations2 but only with the validOtherIDs
    db.conversations2.aggregate([
      {
        $match: {
          userId: 32,
          unreadMessageCount: { $gt: 0 }
        }
      },
      {
        // collect all the unique `otherUserId` for this userId
        $group: {
          _id: null,
          otherUserIds: { $addToSet: "$otherUserId" }
        }
      },
      {
        // correlated but will only run once since
        // the previous stage has only one document
        $lookup: {
          from: "user_blocked",
          let: {
            lookupOtherUserIds: "$otherUserIds"
          },
          pipeline: [
            {
              $match: {
                $expr: {
                  $ne: [
                    { $indexOfArray: ["$$lookupOtherUserIds", "$id"] },
                    -1
                  ]
                }
              }
            },
            {
              $group: {
                _id: null,
                ids: { $addToSet: "$id" }
              }
            }
          ],
          as: "blockedUsers"
        }
      },
      {
        // otherIDs which remain after removing blockedIDs
        $project: {
          validOtherIds: {
            $setDifference: ["$otherUserIds", { $first: "$blockedUsers.ids" }]
          }
        }
      },
      {
        // do a self-lookup on `conversations2`
        $lookup: {
          from: "conversations2",
          let: {
            lookupValidOtherIds: "$validOtherIds"
          },
          // repeat part of the first stage of this pipeline, yuck!
          pipeline: [
            {
              $match: {
                userId: 32,
                // unread > 0 check is not needed since
                // lookupValidOtherIds already has only > 0's
                $expr: {
                  $ne: [
                    { $indexOfArray: ["$$lookupValidOtherIds", "$otherUserId"] },
                    -1
                  ]
                }
              }
            }
          ],
          as: "validConvos"
        }
      },
      // the `group` below can be directly done in the self-lookup stage
      // but I find this cleaner
      { $unwind: "$validConvos" },
      {
        $group: {
          _id: null,
          unreadMessageCount: {
            $sum: "$validConvos.unreadMessageCount"
          }
        }
      }
    ])
    

    Mongo Playground

    3. ‼ No lookups, Add a field otherUserBlocked

    This optimisation will require a data/structure change but is the most scalable and most performant:

    Add a field like otherUserBlocked: true/false and index it. Initially default it to false (no users blocked) and then set it to true using a similar pipeline to the one you already have.

    If you skip initial defaults, you'll need to have clauses { otherUserBlocked: { $exists: true } } added to the queries below.

    Each time a user gets blocked, you're already adding them to user_blocked collection. Add another step to also update conversations2 with { $match: { otherUserId: blocked_user_id } } and set otherUserBlocked: true. Something like:

    db.conversations2.updateMany({
      otherUserId: 46
    },
    {
      $set: {
        otherUserBlocked: true
      }
    })
    

    And if they get unblocked, set it false.

    Your aggregation pipeline can then use this in the first $match stage and eliminates the need for the $lookup completely, as well as the second $match stage. The pipeline becomes:

    db.conversations2.aggregate([
      {
        $match: {
          userId: 32,
          otherUserBlocked: false,
          unreadMessageCount: { $gt: 0 }
        }
      },
      {
        $group: {
          _id: "$userId",
          unreadMessageCount: { $sum: "$unreadMessageCount" }
        }
      }
    ])
    

    Mongo Playground with the new field

    Both of these changes will also be useful when you want to actually show the unread messages, and not just the count.

    More about MongoDB Schema Design best practices.