Search code examples
apache-sparksslapache-kafkajaas

Spark "Failed to construct kafka consumer" via SSL


I'm trying to set up a Spark job to consume data from Kafka. The Kafka brokers have SSL set up, but I'm not able to properly build/authenticate the consumer.

spark-shell command:

spark-2.3.4-bin-hadoop2.7/bin/spark-shell 
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4 
    --files "spark-kafka.jaas"   
    --driver-java-options "-Djava.security.auth.login.config=./spark-kafka.jaas"   
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./spark-kafka.jaas"

spark-kafka.jaas

KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="myusername"
   password="mypwd"
};

Shell commands:

val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1, host2:port2")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.ssl.truststore.location", "./truststore.jks")
    .option("kafka.ssl.truststore.password", "truststore-pwd")
    .option("kafka.ssl.endpoint.identification.algorithm", "")
    .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
    .option("subscribe", "mytopic")
    .option("startingOffsets", "earliest")
    .load()

df.show()

Error:

2019-09-23 16:32:19 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
  at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
  at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
  at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
  at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:62)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:308)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3260)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2495)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2709)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:729)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:688)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:697)
  ... 49 elided
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: Configuration Error:
    Line 5: expected [option key]
  at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
  at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
  at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
  ... 99 more
Caused by: java.lang.SecurityException: java.io.IOException: Configuration Error:
    Line 5: expected [option key]
  at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
  at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at java.lang.Class.newInstance(Class.java:442)
  at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
  at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
  at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:61)
  at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
  at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
  at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
  ... 102 more
Caused by: java.io.IOException: Configuration Error:
    Line 5: expected [option key]
  at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
  at sun.security.provider.ConfigFile$Spi.match(ConfigFile.java:562)
  at sun.security.provider.ConfigFile$Spi.parseLoginEntry(ConfigFile.java:477)
  at sun.security.provider.ConfigFile$Spi.readConfig(ConfigFile.java:427)
  at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:329)
  at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:271)
  at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
  ... 116 more

Solution

  • The message "Configuration Error: Line 5: expected [option key]" indicates a problem with your jaas.conf, most likely you're missing a semicolon after the password=.... JAAS syntax requires ; to terminate each LoginModule block in that file.