Search code examples
mongodbsslpyspark

Cannot connect Apache Spark to MongoDB with SSL


I have successfully installed Apache Spark in Ubuntu 18.04. I have also added mongo-spark-connector to my spark installation. I am currently trying to connect to a MongoDB cluster I have setup externally. I can connect through various different sources to my MongoDB cluster with SSL enabled (SSL is required by MongoDB database server). When I try to connect through spark, the connection timeouts. To connect through SSL I usually use a private key (pk.pem) and CA Certificate (ca.crt).

I have done the following setup:

  • I have converted the private key file from PEM format to PKCS12 format
  • I have converted the CA certificate into PEM format
  • I have created a new keystore and added my newly formatted PKCS12 file (using keytool)
  • I have created a new truststore and added my CA certificate in PEM format (using keytool)

I currently start my script with the following command:

spark-submit \
--driver-java-options -Djavax.net.ssl.trustStore=/path/to/truststore.ks \
--driver-java-options -Djavax.net.ssl.trustStorePassword=tspassword \
--driver-java-options -Djavax.net.ssl.keyStore=/path/to/keystore.ks \
--driver-java-options -Djavax.net.ssl.keyStorePassword=kspassword \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStore=/path/to/truststore.ks \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStorePassword=tspassword \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStore=/path/to/keystore.ks \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStorePassword=kspassword \
python script.py

The following is my PySpark code:

mongo_url = 'mongodb://<user>:<pass>@<host>:<port>/db.collection?replicaSet=replica-set-name&ssl=true&authSource=test&readPreference=nearest'
mongo_df = sqlContext.read.format('com.mongodb.spark.sql.DefaultSource').option('uri', mongo_url).load()

When I execute the script the connection timeouts with the following output:

19/11/14 15:36:47 INFO SparkContext: Created broadcast 0 from broadcast at MongoSpark.scala:542
19/11/14 15:36:47 INFO cluster: Cluster created with settings {hosts=[<host>:<port>], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='haip-replica-set'}
19/11/14 15:36:47 INFO cluster: Adding discovered server <host>:<port> to client view of cluster
19/11/14 15:36:47 INFO MongoClientCache: Creating MongoClient: [<host>:<port>]
19/11/14 15:36:47 INFO cluster: No server chosen by com.mongodb.client.internal.MongoClientDelegate$1@140f4273 from cluster description ClusterDescription{type=REPLICA_SET, connectionMode=MULTIPLE, serverDescriptions=[ServerDescription{address=<host>:<port>, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
19/11/14 15:36:47 INFO cluster: Exception in monitor thread while connecting to server mongodb-data-1.haip.io:27017
com.mongodb.MongoSocketReadException: Prematurely reached end of stream
    at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259)
    at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83)
    at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initializeConnectionDescription(InternalStreamConnectionInitializer.java:105)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:62)
    at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:129)
    at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:117)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "/home/user/script.py", line 16, in <module>
    mongo_df = sqlContext.read.format('com.mongodb.spark.sql.DefaultSource').option('uri', mongo_url).load()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@140f4273. Client view of cluster state is {type=REPLICA_SET, servers=[{address=<host>:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
    at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:408)
    at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:123)
    at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:54)
    at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:147)
    at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:100)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:277)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:181)
    at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:186)
    at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:155)
    at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:150)
    at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
    at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:157)
    at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:237)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221)
    at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
    at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
    at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

19/11/14 15:37:17 INFO SparkContext: Invoking stop() from shutdown hook
19/11/14 15:37:17 INFO MongoClientCache: Closing MongoClient: [<host>:<port>]

Spark version: 2.4.4 Scala version: 2.11 Mongo Java Driver version: 3.11.2 Mongo Spark Connector version: 2.11-2.4.1


Solution

  • Keystore and Truststore were in an incorrect format (.ks). Once I changed the keys to the correct format (.jks) it worked.