Search code examples
sslgoogle-cloud-platformapache-kafkaspark-structured-streaminggoogle-cloud-dataproc

GCP Dataproc - Failed to construct kafka consumer, Failed to load SSL keystore dataproc.jks of type JKS


I'm trying to run a Structured Streaming program on GCP Dataproc, which accesses the data from Kafka and prints it.

Access to Kafka is using SSL, and the truststore and keystore files are stored in buckets. I'm using Google Storage API to access the bucket, and store the file in the current working directory. The truststore and keystores are passed onto the Kafka Consumer/Producer. However - i'm getting an error

Command :

gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py  --cluster dataproc-ss-poc  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 --region us-central1

Code is shown below :

spark = SparkSession.builder.appName('StructuredStreaming_VersaSase').getOrCreate()

kafkaBrokers='<broker-ip>:9094'
topic = "versa-sase"
security_protocol="SSL"

# Google Storage API to access the keys in the buckets
client = storage.Client()
bucket = client.get_bucket('ssl-certs-karan')

blob_ssl_truststore = bucket.get_blob('cap12.jks')
ssl_truststore_location = '{}/{}'.format(os.getcwd(), blob_ssl_truststore.name) 
blob_ssl_truststore.download_to_filename(ssl_truststore_location)

ssl_truststore_password="<ssl_truststore_password>"

blob_ssl_keystore = bucket.get_blob('dataproc-versa-sase-p12-1.jks')
ssl_keystore_location = '{}/{}'.format(os.getcwd(), blob_ssl_keystore.name) 
blob_ssl_keystore.download_to_filename(ssl_keystore_location)


ssl_keystore_password="<ssl_keystore_password>"
consumerGroupId = "versa-sase-grp"
checkpoint = "gs://ss-checkpoint/"

print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)


df = spark.read.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.truststore.location",ssl_truststore_location) \
    .option("kafka.ssl.truststore.password",ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location)\
    .option("kafka.ssl.keystore.password", ssl_keystore_password)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .load()

   print(" df -> ", df)
   query = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "timestamp") \
    .write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation", checkpoint) \
    .option("outputMode", "complete")\
    .option("truncate", "false") \
    .save("output")

Error :

Traceback (most recent call last):
  File "/tmp/3e7304f8e27d4436a2f382280cebe7c5/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py", line 83, in <module>
    query = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "timestamp") \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.Client: IPC Client (1416219052) connection to dataproc-ss-poc-m/10.128.0.78:8030 from root sending #171 org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB.allocate
22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.Client: IPC Client (1416219052) connection to dataproc-ss-poc-m/10.128.0.78:8030 from root got value #171
22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine: Call: allocate took 2ms
: An error occurred while calling o84.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (dataproc-ss-poc-w-0.c.versa-kafka-poc.internal executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:124)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:61)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
    at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactory.java:60)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:342)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:265)
    at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.borrowObject(InternalKafkaConsumerPool.scala:84)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.retrieveConsumer(KafkaDataConsumer.scala:573)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:558)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$getAvailableOffsetRange$1(KafkaDataConsumer.scala:359)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:618)
    at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getAvailableOffsetRange(KafkaDataConsumer.scala:358)
    at org.apache.spark.sql.kafka010.KafkaSourceRDD.resolveRange(KafkaSourceRDD.scala:123)
    at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:75)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
...

Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/3e7304f8e27d4436a2f382280cebe7c5/dataproc-versa-sase-p12-1.jks of type JKS
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:377)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:349)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:299)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
    at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:138)
    at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
    at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:737)
    ... 53 more
Caused by: java.nio.file.NoSuchFileException: /tmp/3e7304f8e27d4436a2f382280cebe7c5/dataproc-versa-sase-p12-1.jks
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
    at java.nio.file.Files.newByteChannel(Files.java:361)
    at java.nio.file.Files.newByteChannel(Files.java:407)
    at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
    at java.nio.file.Files.newInputStream(Files.java:152)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:370)

From my mac, I'm using PKCS files (.p12) and am able to access the Kafka cluster in SSL mode. However, in Dataproc - it seems the expected file format is JKS.

here is the command i used to convert .p12 file to JKS format:

keytool -importkeystore -srckeystore dataproc-versa-sase.p12 -srcstoretype pkcs12 -srcalias versa-sase-user -destkeystore dataproc-versa-sase-p12-1.jks -deststoretype jks -deststorepass <password> -destalias versa-sase-user

What needs to be done to fix this ? it seems the JKS file is not accessible to the Spark program ?

tia!


Solution

  • per note from @OneCricketer, i was able to get this working by using --files <gs://cert1>,<gs://cert2>. Also, this works when using the cluster mode.

    ClusterMode command

    gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py  --cluster dataproc-ss-poc  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.submit.deployMode=cluster --region us-central1
    

    Client Mode :

    gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py  --cluster dataproc-ss-poc  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --region us-central1
    

    Accessing the certs in the Driver:

    # access using the cert name
    ssl_truststore_location="ca.p12"
    ssl_keystore_location="dataproc-versa-sase.p12"
    
    df_stream = spark.readStream.format('kafka') \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password) \
        .option("kafka.bootstrap.servers",kafkaBrokers)\
        .option("subscribe", topic) \
        .option("kafka.group.id", consumerGroupId)\
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", "false") \
        .option("maxOffsetsPerTrigger", 10) \
        .load()