Search code examples
mongodbperformancepysparkconsistency

MongoDB sharded cluster writing more records than inserted


I have a spark dataframe with around 43 million records, which i'm trying to write to a Mongo Collection. When I write it to a unsharded collection, the output records are same as i'm trying to insert. But when i write the same data to a sharded collection (hashed), the number of records increase by 3 millinos.

What's interesting is that the number of records keep on fluctuating even after my spark job has been completed. (no other connections to it)

When i did the same with range sharded collection, the number of records were consistent. (edit: even with range sharded cluster, it started fluctuating after a while)

Can someone help me understand why this is happening? and also, i'm sharding my collection as i've to write about 300 Billion records everyday, and I want to increase my write throughputs; so any other suggestion would be really appreciated.

I have 3 shards, each replicated on 3 instances

I'm not using any other option in the spark mongo connector, only using ordered=False

Edit: The count of records seemed to stabalize after a few hours with the correct number of records, still it would be great if someone could help me understand why mongo exhibits this behaviour


Solution

  • The confusion is the differences between collection metadata and logical documents while there is balancing in progress.

    The bottom line is you should use db.collection.countDocuments() if you need an accurate count.

    Deeper explanation:

    When MongoDB shards a collection it assigns a range of the documents to each shard. As you insert documents these ranges will usually grow unevenly, so the balancer process will split ranges into smaller ones when necessary to keep their data size about the same.

    It also moves these chunks between shards so that each shard has about the same number of chunks.

    The process of moving a chunk from one shard to another involves copying all of the documents in that range, verifying they have all been written to new shard, then deleting them from the old shard. This means that the documents being moved will exist on both shards for a while.

    When you submit a query via mongos, the shard will perform a filter stage to exclude documents in chunks that have not been fully move to this shard, or have not been deleted after fully moving out a chunk.

    To count documents with the benefit of this filter, use db.collection.countDocuments()

    Each mongod maintains metadata for each collection it holds, which includes a count of documents. This count is incremented for each insert and decremented for each delete. The metadata count can't exclude orphan documents from incomplete migrations.

    The document count returned by db.collection.stats() is based on the metadata. This means if the balancer is migrating any chunks the copied but not yet deleted documents will be reported by both shards, so the overall count will be higher.