Mongo 4.4 and respective Golang driver are used. Database’s replica set is being run locally at localhost:27017
, localhost:27020
. I’ve also tried using Atlas’s sandbox cluster which gave me the same results.
According to Mongo's documentation when handling insertion of a new document fullDocument
field of event data is supposed to contain newly inserted document which for some reason is not the case for me. ns
field where database and collection name are supposed to be and documentKey
where affected document _id
is stored are empty as well. operationType
field contains correct operation type. In another test it appeared that update operations do not appear in a change stream at all.
It used to work as it should but now it doesn't. Why does it happen and what am I doing wrong?
// ds is the connection to discord, required for doing stuff inside handlers
func iterateChangeStream(stream *mongo.ChangeStream, ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
defer stream.Close(ctx)
defer cancel() // for graceful crashing
for stream.Next(ctx) {
var event bson.M
err := stream.Decode(&event)
if err != nil {
log.Print(errors.Errorf("Failed to decode event: %w\n", err))
return
}
rv := reflect.ValueOf(event["operationType"]) // getting operation type
opType, ok := rv.Interface().(string)
if !ok {
log.Print("String expected in operationType\n")
return
}
// event["fullDocument"] will be empty even when handling insertion
// models.Player is a struct representing a document of the collection
// I'm watching over
doc, ok := event["fullDocument"].(models.Player)
if !ok {
log.Print("Failed to convert document into Player type")
return
}
handlerCtx := context.WithValue(ctx, "doc", doc)
// handlerToEvent maps operationType to respective handler
go handlerToEvent[opType](ds, handlerCtx, cancel)
}
}
func WatchEvents(ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
pipeline := mongo.Pipeline{
bson.D{{
"$match",
bson.D{{
"$or", bson.A{
bson.D{{"operationType", "insert"}}, // !!!
bson.D{{"operationType", "delete"}},
bson.D{{"operationType", "invalidate"}},
},
}},
}},
}
// mongo instance is initialized on program startup and stored in a global variable
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
stream, err := db.Instance.Collection.Watch(ctx, pipeline, opts)
if err != nil {
log.Panic(err)
}
defer stream.Close(ctx)
iterateChangeStream(stream, ds, ctx, cancel)
}
My issue might be related to this, except that it consistently occurs on insertion instead ocuring sometimes on updates. If you know how to enable change stream optimization feature flag mentioned inside link above, let me know.
Feel free to ask for more clarifications.
The question was answered here.
You need to create the following structure to unmarshal event into:
type CSEvent struct {
OperationType string `bson:"operationType"`
FullDocument models.Player `bson:"fullDocument"`
}
var event CSEvent
err := stream.Decode(&event)
event
will contain a copy of the inserted document.