Search code examples
javamongodbcdc

Mongo CDC throws : BSONObjectTooLarge . How to ignore this and proceed further?


I would liketo listen only to 3 collections in a database: c1, c2, c3. I was not able to figureout how to limit listening to these 3 collections only. Below is my code.

  1. i would like to ignore this error and proceed further. How to do it? In this case the cursor itself is not getting created.
  2. Like i said previously, is there a way to limit the listening to the collections c1, c2 c3 collections only?-- on the db side. Below code is listening to the full db and then filtering the collections on the java side.
        List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete", "update"))));
        MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
        String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
        if (resumeTokenStr == null) {
            cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
        } else {
            BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
            cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
        }
        return cursor;

The above code throws the below error

com.mongodb.MongoCommandException: Command failed with error 10334 (BSONObjectTooLarge): 'BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004" }' on server crm-mongo-report01.prod.phenom.local:27017. The full response is {"operationTime": {"$timestamp": {"t": 1664707966, "i": 25}}, "ok": 0.0, "errmsg": "BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: \"826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004\" }", "code": 10334, "codeName": "BSONObjectTooLarge", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1664707966, "i": 26}}, "signature": {"hash": {"$binary": {"base64": "NZDJKhCse19Eud88kNh7XRWRgas=", "subType": "00"}}, "keyId": 7113062344413937666}}}
    at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:413)
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:337)
    at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:644)
    at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:240)
    at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:226)
    at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:126)
    at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:116)
    at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:345)
    at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:232)
    at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:214)
    at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:575)
    at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
    at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:574)
    at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
    at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:573)
    at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:211)
    at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
    at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:217)
    at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:197)
    at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
    at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:347)
    at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:343)
    at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:538)
    at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:343)
    at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
    at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:221)
    at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:174)
    at com.company.cdc.services.CDCMain.getCursorAtResumeToken(CdcServiceMain.java:217)

line 217 points to the line : cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();


Solution

  • This is how i ended up solving the problem. (just incase somebody else is searching for solution to-this problem)

    1. removed FullDocument.UPDATE_LOOKUP when creating the cursor. So now my code looks like cursor = mongoClient.watch(pipeline).batchSize(20000).cursor(); --Now this avoided the gigantic column that may endup in the document which was eventually error-ing out. This worked.

    2. In my case i didnt have to listen to the collection updates which had this bad data. So i modified my cursor to listen only on the databases and collections of my interest --instead of listening on the entire database and then ignoring the unwanted collections later. Below is the code

    3. When writing to the destination i had done bulk lookup on mongo db, constructed the full document and then written it --This approach of lazy lookup reduced a lot of memory footprint of the java program.

    
        private List<Bson> generatePipeline(CdcConfigs cdcConfig) {
            List<String> whiteListedCollections = getWhitelistedCollection(cdcConfig);
            List<String> whiteListedDbs = getWhitelistedDbs(cdcConfig);
            log.info("whitelisted dbs:" + whiteListedDbs + " coll:" + whiteListedCollections);
            List<Bson> pipeline;
            if (whiteListedDbs.size() > 0)
                pipeline = singletonList(match(and(
                        in("ns.db", whiteListedDbs),
                        in("ns.coll", whiteListedCollections),
                        in("operationType", asList("insert", "delete", "update")))));
            else
                pipeline = singletonList(match(and(
                        in("ns.coll", whiteListedCollections),
                        in("operationType", asList("insert", "delete", "update")))));
            return pipeline;
        }
    
        private MongoChangeStreamCursor<ChangeStreamDocument<Document>> getCursorAtResumeToken(CdcConfigs cdcConfig, MongoClient mongoClient) {
            List<Bson> pipeline = generatePipeline(cdcConfig);
            MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
            String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
            if (resumeTokenStr == null) {
    //            cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
                cursor = mongoClient.watch(pipeline).batchSize(20000).cursor();
                log.warn("RESUME TOKEN IS NULL. READING CDC FROM CURRENT TIMESTAMP FROM MONGO DB !!! ");
            } else {
                BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
                cursor = mongoClient.watch(pipeline).batchSize(20000).maxAwaitTime(30, TimeUnit.MINUTES).startAfter(resumeToken).cursor();
    //            cursor = mongoClient.watch(pipeline).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
            }
            return cursor;
        }
    
    

    The solutions available sofar are more inclined towards python code. So it was a challange translating them into Java.