I'm trying to run a spark-submit job, using a MongoDB instance on a remote machine, via the Mongo-Spark Connector.
When I initiate the mongod service without the --auth
flag, and run the spark-submit
command like this:
./bin/spark-submit --master spark://10.0.3.155:7077 \
--conf "spark.mongodb.input.uri=mongodb://10.0.3.156/test.coll?readPreference=primaryPreferred" \
--conf "spark.mongodb.output.uri=mongodb://10.0.3.156/test.coll" \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 \
app1.py
Everything works like a charm.
But when I run the mongod service with the --auth
flag, and run the spark-submit
like that:
./bin/spark-submit --master spark://10.0.3.155:7077 \
--conf "spark.mongodb.input.uri=mongodb://admin:pass@10.0.3.156/test.coll?readPreference=primaryPreferred" \
--conf "spark.mongodb.output.uri=mongodb://admin:pass@10.0.3.156/test.coll" \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 \
app1.py
I get these errors:
py4j.protocol.Py4JJavaError: An error occurred while calling o47.save. : com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=10.0.3.156:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=null, userName='admin', source='test', password=<hidden>, mechanismProperties={}}}, caused by {com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server 10.0.3.156:27017. The full response is { "ok" : 0.0, "code" : 18, "errmsg" : "Authentication failed." }}}]
at com.mongodb.connection.BaseCluster.createTimeoutException(BaseCluster.java:369)
at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:101)
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75)
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71)
at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:158)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:133)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:128)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:118)
at com.mongodb.operation.DropCollectionOperation.execute(DropCollectionOperation.java:54)
at com.mongodb.operation.DropCollectionOperation.execute(DropCollectionOperation.java:39)
at com.mongodb.Mongo.execute(Mongo.java:781)
at com.mongodb.Mongo$2.execute(Mongo.java:764)
at com.mongodb.MongoCollectionImpl.drop(MongoCollectionImpl.java:419)
at com.mongodb.spark.sql.DefaultSource$$anonfun$createRelation$1.apply(DefaultSource.scala:89)
at com.mongodb.spark.sql.DefaultSource$$anonfun$createRelation$1.apply(DefaultSource.scala:89)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
I've checked the credentials and roles, and all is fine. Can't figure out what I'm doing wrong...
You need to configure the authSource
query parameter in your Connection String, otherwise the supplied database (test) will be used:
spark.mongodb.input.uri=mongodb://10.0.3.156/test.coll?authSource=admin&readPreference=primaryPreferred