I am trying to parse JSON messages with Pyspark from an Azure Eventhub with enabled Kafka compatibility. I can't find any documentation on how to establish the connection.
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
sc.stop() # Jupyter somehow created a context already..
sc = SparkContext(appName="PythonTest")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
# my connection string:
#Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=examplekeyname;SharedAccessKey=HERETHEJEY=;EntityPath=examplepathname - has a total of 5 partitions
kafkaStream = KafkaUtils.createStream(HOW DO I STRUCTURE THIS??)
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()
See my answer (and question) here. That was for how to write to an Kafka-enabled Event Hub in pyspark but I assume reading config should be pretty similar. The tricky part was to get the security configuration right.
EH_SASL = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
// Source: https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/spark#running-spark
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
You can find any official tutorial on how to set up a consumer here. It's for Scala instead of PySpark but it's fairly easy to transform the code if you compare it with my example.