Search code examples
mongodbgoevent-handlingmgomongo-go

Watch for MongoDB Change Streams


We want our Go application to listen to the data changes on a collection. So, googling in search for a solution, we came across MongoDB's Change Streams. That link also exhibits some implementation snippets for a bunch of languages such as Python, Java, Nodejs etc. Yet, there is no piece of code for Go.

We are using Mgo as a driver but could not find explicit statements on change streams.

Does anyone have any idea on how to watch on Change Streams using that Mgo or any other Mongo driver for Go?


Solution

  • The popular mgo driver (github.com/go-mgo/mgo) developed by Gustavo Niemeyer has gone dark (unmaintained). And it has no support for change streams.

    The community supported fork github.com/globalsign/mgo is in much better shape, and has already added support for change streams (see details here).

    To watch changes of a collection, simply use the Collection.Watch() method which returns you a value of mgo.ChangeStream. Here's a simple example using it:

    coll := ... // Obtain collection
    
    pipeline := []bson.M{}
    
    changeStream := coll.Watch(pipeline, mgo.ChangeStreamOptions{})
    var changeDoc bson.M
    for changeStream.Next(&changeDoc) {
        fmt.Printf("Change: %v\n", changeDoc)
    }
    
    if err := changeStream.Close(); err != nil {
        return err
    }
    

    Also note that there is an official MongoDB Go driver under development, it was announced here: Considering the Community Effects of Introducing an Official MongoDB Go Driver

    It is currently in alpha (!!) phase, so take this into consideration. It is available here: github.com/mongodb/mongo-go-driver. It also already has support for change streams, similarly via the Collection.Watch() method (this is a different mongo.Collection type, it has nothing to do with mgo.Collection). It returns a mongo.Cursor which you may use like this:

    var coll mongo.Collection = ... // Obtain collection
    
    ctx := context.Background()
    
    var pipeline interface{} // set up pipeline
    
    cur, err := coll.Watch(ctx, pipeline)
    if err != nil {
        // Handle err
        return
    }
    defer cur.Close(ctx)
    
    for cur.Next(ctx) {
        elem := bson.NewDocument()
        if err := cur.Decode(elem); err != nil {
            log.Fatal(err)
        }
    
        // do something with elem....
    }
    
    if err := cur.Err(); err != nil {
        log.Fatal(err)
    }