I would liketo listen only to 3 collections in a database: c1, c2, c3. I was not able to figureout how to limit listening to these 3 collections only. Below is my code.
c1
, c2
c3
collections only?-- on the db side. Below code is listening to the full db and then filtering the collections on the java side. List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete", "update"))));
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
if (resumeTokenStr == null) {
cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
} else {
BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
}
return cursor;
The above code throws the below error
com.mongodb.MongoCommandException: Command failed with error 10334 (BSONObjectTooLarge): 'BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004" }' on server crm-mongo-report01.prod.phenom.local:27017. The full response is {"operationTime": {"$timestamp": {"t": 1664707966, "i": 25}}, "ok": 0.0, "errmsg": "BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: \"826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004\" }", "code": 10334, "codeName": "BSONObjectTooLarge", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1664707966, "i": 26}}, "signature": {"hash": {"$binary": {"base64": "NZDJKhCse19Eud88kNh7XRWRgas=", "subType": "00"}}, "keyId": 7113062344413937666}}}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:413)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:337)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:644)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:240)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:226)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:126)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:116)
at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:345)
at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:232)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:214)
at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:575)
at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:574)
at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:573)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:211)
at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:217)
at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:197)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:347)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:343)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:538)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:343)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:221)
at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:174)
at com.company.cdc.services.CDCMain.getCursorAtResumeToken(CdcServiceMain.java:217)
line 217 points to the line : cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
This is how i ended up solving the problem. (just incase somebody else is searching for solution to-this problem)
removed FullDocument.UPDATE_LOOKUP
when creating the cursor. So now my code looks like cursor = mongoClient.watch(pipeline).batchSize(20000).cursor();
--Now this avoided the gigantic column that may endup in the document which was eventually error-ing out. This worked.
In my case i didnt have to listen to the collection updates which had this bad data. So i modified my cursor to listen only on the databases and collections of my interest --instead of listening on the entire database and then ignoring the unwanted collections later. Below is the code
When writing to the destination i had done bulk lookup on mongo db, constructed the full document and then written it --This approach of lazy lookup reduced a lot of memory footprint of the java program.
private List<Bson> generatePipeline(CdcConfigs cdcConfig) {
List<String> whiteListedCollections = getWhitelistedCollection(cdcConfig);
List<String> whiteListedDbs = getWhitelistedDbs(cdcConfig);
log.info("whitelisted dbs:" + whiteListedDbs + " coll:" + whiteListedCollections);
List<Bson> pipeline;
if (whiteListedDbs.size() > 0)
pipeline = singletonList(match(and(
in("ns.db", whiteListedDbs),
in("ns.coll", whiteListedCollections),
in("operationType", asList("insert", "delete", "update")))));
else
pipeline = singletonList(match(and(
in("ns.coll", whiteListedCollections),
in("operationType", asList("insert", "delete", "update")))));
return pipeline;
}
private MongoChangeStreamCursor<ChangeStreamDocument<Document>> getCursorAtResumeToken(CdcConfigs cdcConfig, MongoClient mongoClient) {
List<Bson> pipeline = generatePipeline(cdcConfig);
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
if (resumeTokenStr == null) {
// cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
cursor = mongoClient.watch(pipeline).batchSize(20000).cursor();
log.warn("RESUME TOKEN IS NULL. READING CDC FROM CURRENT TIMESTAMP FROM MONGO DB !!! ");
} else {
BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
cursor = mongoClient.watch(pipeline).batchSize(20000).maxAwaitTime(30, TimeUnit.MINUTES).startAfter(resumeToken).cursor();
// cursor = mongoClient.watch(pipeline).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
}
return cursor;
}
The solutions available sofar are more inclined towards python code. So it was a challange translating them into Java.