I'm trying to set up a Spark job to consume data from Kafka. The Kafka brokers have SSL set up, but I'm not able to properly build/authenticate the consumer.
spark-shell
command:
spark-2.3.4-bin-hadoop2.7/bin/spark-shell
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4
--files "spark-kafka.jaas"
--driver-java-options "-Djava.security.auth.login.config=./spark-kafka.jaas"
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./spark-kafka.jaas"
spark-kafka.jaas
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="myusername"
password="mypwd"
};
Shell commands:
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1, host2:port2")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "./truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-pwd")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("subscribe", "mytopic")
.option("startingOffsets", "earliest")
.load()
df.show()
Error:
2019-09-23 16:32:19 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:62)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:308)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3260)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2495)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2709)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at org.apache.spark.sql.Dataset.show(Dataset.scala:729)
at org.apache.spark.sql.Dataset.show(Dataset.scala:688)
at org.apache.spark.sql.Dataset.show(Dataset.scala:697)
... 49 elided
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: Configuration Error:
Line 5: expected [option key]
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
... 99 more
Caused by: java.lang.SecurityException: java.io.IOException: Configuration Error:
Line 5: expected [option key]
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:61)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 102 more
Caused by: java.io.IOException: Configuration Error:
Line 5: expected [option key]
at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
at sun.security.provider.ConfigFile$Spi.match(ConfigFile.java:562)
at sun.security.provider.ConfigFile$Spi.parseLoginEntry(ConfigFile.java:477)
at sun.security.provider.ConfigFile$Spi.readConfig(ConfigFile.java:427)
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:329)
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:271)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
... 116 more
The message "Configuration Error:
Line 5: expected [option key]" indicates a problem with your jaas.conf
, most likely you're missing a semicolon after the password=...
. JAAS syntax requires ;
to terminate each LoginModule block in that file.