Search code examples
azurepysparkapache-kafkaazure-eventhub

How to format a pyspark connection string for Azure Eventhub with Kafka


I am trying to parse JSON messages with Pyspark from an Azure Eventhub with enabled Kafka compatibility. I can't find any documentation on how to establish the connection.

import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

sc.stop() # Jupyter somehow created a context already.. 
sc = SparkContext(appName="PythonTest")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)

# my connection string: 
#Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=examplekeyname;SharedAccessKey=HERETHEJEY=;EntityPath=examplepathname - has a total of 5 partitions

kafkaStream = KafkaUtils.createStream(HOW DO I STRUCTURE THIS??)
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()

Solution

  • See my answer (and question) here. That was for how to write to an Kafka-enabled Event Hub in pyspark but I assume reading config should be pretty similar. The tricky part was to get the security configuration right.

    EH_SASL = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
    // Source: https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/spark#running-spark
    
    dfKafka \
    .write  \
    .format("kafka") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.batch.size", 5000) \
    .option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
    .option("kafka.request.timeout.ms", 120000) \
    .option("topic", "raw") \
    .option("checkpointLocation", "/mnt/telemetry/cp.txt") \
    .save()
    

    You can find any official tutorial on how to set up a consumer here. It's for Scala instead of PySpark but it's fairly easy to transform the code if you compare it with my example.