Search code examples
mongodbgomongo-go

Mongodb doesn't retrieve all documents in a collection with 2 million records using cursor


I have a collections of 2,000,000 records

> db.events.count();                                     │
2000000             

and I use golang mongodb client to connect to the database

package main

import (
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27888").SetAuth(options.Credential{
        Username: "mongoadmin",
        Password: "secret",
    }))

    if err != nil {
        panic(err)
    }

    defer func() {
        if err = client.Disconnect(ctx); err != nil {
            panic(err)
        }
    }()


    collection := client.Database("test").Collection("events")

    var bs int32 = 10000
    var b = true
    cur, err := collection.Find(context.Background(), bson.D{}, &options.FindOptions{
        BatchSize: &bs, NoCursorTimeout: &b})
    if err != nil {
        log.Fatal(err)
    }
    defer cur.Close(ctx)

    s, n := runningtime("retrive db from mongo and publish to kafka")
    count := 0
    for cur.Next(ctx) {
        var result bson.M
        err := cur.Decode(&result)
        if err != nil {
            log.Fatal(err)
        }

        bytes, err := json.Marshal(result)
        if err != nil {
            log.Fatal(err)
        }
        count++

        msg := &sarama.ProducerMessage{
            Topic: "hello",
            // Key:   sarama.StringEncoder("aKey"),
            Value: sarama.ByteEncoder(bytes),
        }
        asyncProducer.Input() <- msg
    }


But the the program only retrives only about 600,000 records instead of 2,000,000 every times I ran the program.

$ go run main.go
done
count = 605426
nErrors = 0
2020/09/18 11:23:43 End:         retrive db from mongo and publish to kafka took 10.080603336s

I don't know why? I want to retrives all 2,000,000 records. Thanks for any help.


Solution

  • Your loop fetching the results may end early because you are using the same ctx context for iterating over the results which has a 10 seconds timeout.

    Which means if retrieving and processing the 2 million records (including connecting) takes more than 10 seconds, the context will be cancelled and thus the cursor will also report an error.

    Note that setting FindOptions.NoCursorTimeout to true is only to prevent cursor timeout for inactivity, it does not override the used context's timeout.

    Use another context for executing the query and iterating over the results, one that does not have a timeout, e.g. context.Background().

    Also note that for constructing the options for find, use the helper methods, so it may look as simple and as elegant as this:

    options.Find().SetBatchSize(10000).SetNoCursorTimeout(true)
    

    So the working code:

    ctx2 := context.Background()
    
    cur, err := collection.Find(ctx2, bson.D{},
        options.Find().SetBatchSize(10000).SetNoCursorTimeout(true))
    
    // ...
    
    for cur.Next(ctx2) {
        // ...
    }
    
    // Also check error after the loop:
    if err := cur.Err(); err != nil {
        log.Printf("Iterating over results failed: %v", err)
    }