Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streamingazure-eventhub

why is my kafka/eventhub stream-read getting stuck?


I am trying to read from Event Hub with the following code:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", bootstrap_server) \
  .option("subscribe", topic) \
  .option("startingOffsets", "latest")\
  .option("maxOffsetsPerTrigger", 1000)\
  .load()

display(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))

When I execute the display, it gets stuck on 'Stream initializing...'

And by the way, I am able to successfully write to this same Event Hub.

Any suggestions for what to check? It just hangs like this forever.

Also I should mention, I am brand new to Kafka and Event Hub. I simply created the Event Hub resource on Azure. Is there any coding or configuration that I need to do on the Event Hub side to allow consumers to receive messages?


Solution

  • Figured it out.

    Looks like I was missing the authentication options.

    Here is what worked:

    df = (spark
      .readStream
      .format("kafka")
      .option("subscribe", topic)
      .option("kafka.bootstrap.servers", bootstrap_server)
      .option("kafka.security.protocol", "SASL_SSL")
      .option("kafka.sasl.mechanism", "PLAIN")
      .option("kafka.sasl.jaas.config", connection_string)
      .option("kafka.request.timeout.ms", "60000")
      .option("kafka.session.timeout.ms", "30000")
      .load())