Here is the problem.
I have local mongos
instance that is connected to remote mongod
.
Remote DB uses basic password authentication.
I'm trying to setup ChangeStream watcher for particular collection with simple Scala application.
The actual code looks like that:
private val mongo = new MongoClient(
new ServerAddress("localhost", 27017),
MongoCredential.createCredential("username", "myDB", "password".toCharArray),
MongoClientOptions.builder().addServerListener(ServerStateListener).build()
)
private val collection = mongo
.getDatabase(DB)
.getCollection("someObjectsCollection")
private val ch = collection
.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.iterator()
It breaks on .fullDocument(FullDocument.UPDATE_LOOKUP)
line telling:
Exception in thread "main" com.mongodb.MongoCommandException: Command failed with error 13: 'not authorized on myDB to execute command { aggregate: "someObjectsCollection", pipeline: [ { $changeStream: { fullDocument: "updateLookup" } } ], cursor: {}, $db: "myDB", $clusterTime: { clusterTime: Timestamp(1524064297, 2), ....
That's confusing because given user credentials working through mongo shell
both on remote DB and local mongos
. Moreover, I tried to perform some other actions with collection inside that application (like collection.count()
) and it works! The problem appears when I'm trying to setup the watcher.
finally I figured out what was wrong with my setup...
original user 'username' that I was using to consume change stream had strict permission set:
"inheritedPrivileges" : [
{
"resource" : {
"db" : "abuCoreDev",
"collection" : ""
},
"actions" : [
"convertToCapped",
"createCollection",
"createIndex",
"dropIndex",
"find",
"insert",
"listCollections",
"listIndexes",
"planCacheIndexFilter",
"remove",
"update"
]
}
],
I didn't realize that I need special changeStream
permission to consume change streams! Everything works fine when I connect to mongos
as root
which has that cursed permission.
Here you can see permissions for my root user:
{
"resource" : {
"db" : "",
"collection" : ""
},
"actions" : [
"bypassDocumentValidation",
"changeCustomData",
"changePassword",
"changeStream",
"collMod",
"collStats",
"compact",
"convertToCapped",
"createCollection",
"createIndex",
"createRole",
"createUser",
"dbHash",
"dbStats",
"dropCollection",
"dropDatabase",
"dropIndex",
"dropRole",
"dropUser",
"emptycapped",
"enableProfiler",
"enableSharding",
"find",
"getShardVersion",
"grantRole",
"indexStats",
"insert",
"killCursors",
"listCollections",
"listIndexes",
"moveChunk",
"planCacheIndexFilter",
"planCacheRead",
"planCacheWrite",
"reIndex",
"remove",
"renameCollectionSameDB",
"repairDatabase",
"revokeRole",
"setAuthenticationRestriction",
"splitChunk",
"splitVector",
"storageDetails",
"update",
"validate",
"viewRole",
"viewUser"
]
}