I am trying to read data from Kafka via spark structured streaming. However, in Spark 2.4.0., you cannot set group id for the stream (see How to set group.id for consumer group in kafka data source in Structured Streaming?).
However, as this is not set, spark simply generates the group Id and I am stuck at GroupAuthorizationException:
19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2
Any ideas how to bypass this please? Funny thing is, I am able to read this data via kafka-console-consumer.sh, where I am able to pass the group id in a .properties file.
Code throwing the exception:
val df = spark
.readStream
.format("kafka")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("kafka.group.id", "idThatShouldBeUsed")
.option("kafka.bootstrap.servers", "server")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/location)
.option("kafka.ssl.truststore.password", "pass")
.option("kafka.sasl.jaas.config", """jaasToUse""")
.load()
.writeStream
.outputMode("append")
.format("console")
.option("startingOffsets", "earliest")
.start().awaitTermination()
Seems that this is not solvable from the consumer's side. We ended up having to use bin/kafka-acls.sh and wildcards to allow all group ids generated by structured streaming.
kafka acl example:
bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed