Search code examples
pythonmongodb

How can i listen in real-time for changes on my MongoDB?


I have a DB where i am sending some data. At the same time i'm running a Python script and i would like this script to send on my console the last entry to the MongoDB database as soon as it's added.

I've been looking for a solution to this for days without finding anything.

I made some research and found about: a) tailable cursor, but the only problem is that my database is not capped, and since i will be putting data every 5 seconds i'm afraid that a capped database would not be enough for my needs since when the max size it's reached it will start overwriting the oldest records b) change_streams, but my db is not a replica set, i'm fairly new to this so i still have to learn about more advanced topics like RS.

Any advice?

This is what i got so far:

from pymongo import MongoClient
import pymongo
import time
import random
from pprint import pprint

#Step 1: Connect to MongoDB - Note: Change connection string as needed
client = MongoClient(port=27017)


db = client.one

mycol = client["coll"]


highest_previous_primary_key = 1

while True:
    cursor = db.mycol.find()
    for document in cursor:

        # get the current primary key, and if it's greater than the previous one
        # we print the results and increment the variable to that value
        current_primary_key = document['num']
        if current_primary_key > highest_previous_primary_key:
            print(document['num'])
            highest_previous_primary_key = current_primary_key

But the problem with this it's that it will stop printing after the 4th record, plus i don't know if it's the best solution for when my db will have a lot of data.

Any advice?


Solution

  • b) change_streams, but my db is not a replica set, i'm fairly new to this so i still have to learn about more advanced topics like RS

    Replica sets provide redundancy and high availability, and are the basis for all production MongoDB deployments. Having said that, for testing and/or deployment you can deploy a replica set with only a single member. For local testing example:

    mongod --dbpath /path/data/test --replSet test
    

    Once the local test server started, connect with mongo shell to perform rs.initiate():

    mongo
    > rs.initiate()
    

    See related Deploy a replica set for testing and deployment or more easier if you familiar with docker here or docker-compose effort here 00 01

    try:
        # Only catch insert operations
        with client.watch([{'$match': {'operationType': 'insert'}}]) as stream:
            for insert_change in stream:
                print(insert_change)
    except pymongo.errors.PyMongoError:
        # The ChangeStream encountered an unrecoverable error or the
        # resume attempt failed to recreate the cursor.
        logging.error('...')
    

    See also pymongo.mongo_client.MongoClient.watch()