Search code examples
mongodbgochangestream

MongoDB change stream returns empty fullDocument on insert


Mongo 4.4 and respective Golang driver are used. Database’s replica set is being run locally at localhost:27017, localhost:27020. I’ve also tried using Atlas’s sandbox cluster which gave me the same results.

According to Mongo's documentation when handling insertion of a new document fullDocument field of event data is supposed to contain newly inserted document which for some reason is not the case for me. ns field where database and collection name are supposed to be and documentKey where affected document _id is stored are empty as well. operationType field contains correct operation type. In another test it appeared that update operations do not appear in a change stream at all.

It used to work as it should but now it doesn't. Why does it happen and what am I doing wrong?

Code

// ds is the connection to discord, required for doing stuff inside handlers
func iterateChangeStream(stream *mongo.ChangeStream, ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
    defer stream.Close(ctx)
    defer cancel() // for graceful crashing

    for stream.Next(ctx) {
        var event bson.M
        err := stream.Decode(&event)
        if err != nil {
            log.Print(errors.Errorf("Failed to decode event: %w\n", err))
            return
        }

        rv := reflect.ValueOf(event["operationType"]) // getting operation type
        opType, ok := rv.Interface().(string)
        if !ok {
            log.Print("String expected in operationType\n")
            return
        }
        
        // event["fullDocument"] will be empty even when handling insertion
        // models.Player is a struct representing a document of the collection
        // I'm watching over
        doc, ok := event["fullDocument"].(models.Player)
        if !ok {
            log.Print("Failed to convert document into Player type")
            return
        }
        handlerCtx := context.WithValue(ctx, "doc", doc)
        // handlerToEvent maps operationType to respective handler
        go handlerToEvent[opType](ds, handlerCtx, cancel)
    }
}

func WatchEvents(ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {

    pipeline := mongo.Pipeline{
        bson.D{{
            "$match",
            bson.D{{
                "$or", bson.A{
                    bson.D{{"operationType", "insert"}}, // !!!
                    bson.D{{"operationType", "delete"}},
                    bson.D{{"operationType", "invalidate"}},
                },
            }},
        }},
    }
    // mongo instance is initialized on program startup and stored in a global variable
    opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
    stream, err := db.Instance.Collection.Watch(ctx, pipeline, opts)
    if err != nil {
        log.Panic(err)
    }
    defer stream.Close(ctx)

    iterateChangeStream(stream, ds, ctx, cancel)
}

My issue might be related to this, except that it consistently occurs on insertion instead ocuring sometimes on updates. If you know how to enable change stream optimization feature flag mentioned inside link above, let me know.

Feel free to ask for more clarifications.


Solution

  • The question was answered here.

    TLDR

    You need to create the following structure to unmarshal event into:

    type CSEvent struct {
        OperationType string        `bson:"operationType"`
        FullDocument  models.Player `bson:"fullDocument"`
    }
    
    var event CSEvent
    err := stream.Decode(&event)
    

    event will contain a copy of the inserted document.