I am trying to read from Event Hub with the following code:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_server) \
.option("subscribe", topic) \
.option("startingOffsets", "latest")\
.option("maxOffsetsPerTrigger", 1000)\
.load()
display(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))
When I execute the display, it gets stuck on 'Stream initializing...'
And by the way, I am able to successfully write to this same Event Hub.
Any suggestions for what to check? It just hangs like this forever.
Also I should mention, I am brand new to Kafka and Event Hub. I simply created the Event Hub resource on Azure. Is there any coding or configuration that I need to do on the Event Hub side to allow consumers to receive messages?
Figured it out.
Looks like I was missing the authentication options.
Here is what worked:
df = (spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("kafka.bootstrap.servers", bootstrap_server)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", connection_string)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.load())