Search code examples
pythonmongodbpymongochangestream

How to listen to change in specific field of a nested array in mongodb change streams?


This is the structure of BSON document I have in mongodb.

{
    "tournament_id": "P1oi12mwj10b1b",
    "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "",
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        },
        {
            "date_order": 2,
            "matches": [
                {
                    "match_id": "2B4k0sp29"
                    "time_order": 1,
                    "win": "",
                    "team1": "manU",
                    "team2": "manC"
                },
                {
                    "match_id": "4A4i0sp31"
                    "time_order": 2,
                    "win": "",
                    "team1": "chelsea",
                    "team2": "arsenal"
                }
            ]
        }
    ]
}

I want to make a notification system which sends notification whenever a match completes. In other words, whenever the value of win field changes, I want to catch which match was updated. I'm using mongodb change streams.

For example, if match with match_id 3A4j0sp26 just completed, I want to print that object.

{
      "match_id": "3A4j0sp26"
      "time_order": 2,
      "win": "team2",
      "team1": "rma",
      "team2": "sev"
      # If possible I also want to find these fields,
      "tournament_id": "P1oi12mwj10b1b",
      "date_order": 1
}

I tried doing this.

import pymongo
from bson.json_util import dumps

MONGO_URI = 'mongodb://localhost/mydb'
client = pymongo.MongoClient(MONGO_URI)

filters = []  # How to correctly set this filter ???
'''
What I already tried but failed
filters = [{
        '$match': {
            '$and': [
                {'updateDescription.updatedFields.matches': {'$exists': 'true'}},  # This line needs fixing.
                {'operationType': {'$in': ['replace', 'update']}}
            ]
        }
    }]
'''


change_stream = client.mydb.match.watch(filters)
for change in change_streams:
    print(dumps(change))

I tried to debug without applying filters. I updated win field of match_id 3A4j0sp26 to team2. I get this as result.

{
  "_id": {
    "_data": "8261252C2F000000012B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
  },
  "operationType": "replace",
  "clusterTime": {
    "$timestamp": {
      "t": 1629826095,
      "i": 1
    }
  },
  "fullDocument": {
    "_id": {
      "$oid": "61228ae88cf6743d054b8cef"
    },
    "tournament_id": "P1oi12mwj10b1b",
    "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",    # This was updated earlier. I don't want this.
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "team1",     # This is the most recently updated.
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        }]
   }
}

It shows all elements from the array and not the one which was just updated.

Edited

Result I got after updating "score" field only.

{
  "_id": {
    "_data": "8261254598000000022B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
  },
  "operationType": "update",
  "clusterTime": {
    "$timestamp": {
      "t": 1629832600,
      "i": 2
    }
  },
  "ns": {
    "db": "mydb",
    "coll": "match"
  },
  "documentKey": {
    "_id": {
      "$oid": "61228ae88cf6743d054b8cef"
    }
  },
  "updateDescription": {
    "updatedFields": {
       "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "team1",
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        }]
   },
     "removedFields": []
    }
  }
}

Solution

  • It depends on how you do the update.

    A short test to demonstrate:

    Insert a document and start a change stream

    PRIMARY> db.updtest.insert({list:[
                             {item:"1",state:"running"},
                             {item:"2",state:"done"},
                             {item:"3",state:"unknown"}
                      ]});
    
    WriteResult({ "nInserted" : 1 })
    
    PRIMARY> let stream = db.updtest.watch()
    

    Updating by setting the list field results in a change event that returns the entire array:

    PRIMARY> db.updtest.updateOne({},{$set:{list:[
                             {item:"1",state:"running"},
                             {item:"2",state:"done"},
                             {item:"3",state:"running"}
                      ]}});
    
    { "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
    
    PRIMARY> stream.next();
    
    {
        "_id" : {
            "_data" : "82612577BE000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
        },
        "operationType" : "update",
        "clusterTime" : Timestamp(1629845438, 1),
        "ns" : {
            "db" : "test",
            "coll" : "updtest"
        },
        "documentKey" : {
            "_id" : ObjectId("6125779d98787c286c544305")
        },
        "updateDescription" : {
            "updatedFields" : {
                "list" : [
                    {
                        "item" : "1",
                        "state" : "running"
                    },
                    {
                        "item" : "2",
                        "state" : "done"
                    },
                    {
                        "item" : "3",
                        "state" : "running"
                    }
                ]
            },
            "removedFields" : [ ]
        }
    }
    

    Updating just one field in one subdocument results in a change event that include only the modified field:

    PRIMARY> db.updtest.update({"list.item":"3"},{$set:{"list.$.state":"done"}});
    
    WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
    
    PRIMARY> stream.next();
    
    {
        "_id" : {
            "_data" : "8261257879000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
        },
        "operationType" : "update",
        "clusterTime" : Timestamp(1629845625, 1),
        "ns" : {
            "db" : "test",
            "coll" : "updtest"
        },
        "documentKey" : {
            "_id" : ObjectId("6125779d98787c286c544305")
        },
        "updateDescription" : {
            "updatedFields" : {
                "list.2.state" : "done"
            },
            "removedFields" : [ ]
        }
    }
    

    If you also use the change stream option to return the full documents, you will have the context around the field that was changed.