I am trying to read the data from event hub from Azure Databricks using the following code.
from pyspark.sql.functions import *
from pyspark.sql.types import *
NAMESPACE_NAME = "*myEventHub*"
KEY_NAME = "*MyPolicyName*"
KEY_VALUE = "*MySharedAccessKey*"
connectionString = "Endpoint=sb://{0}.servicebus.windows.net/;SharedAccessKeyName={1};SharedAccessKey={2};EntityPath=ingestion".format(NAMESPACE_NAME, KEY_NAME, KEY_VALUE)
ehConf = {}
ehConf['eventhubs.connectionString'] = connectionString
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
It's throwing this error
java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.streaming.eventhubs.EventHubsSourceProvider could not be instantiated
I am using Databricks runtime version 14.2 (Scala 2.12, Spark 3.5.0) and installed com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22 package. Please can someone help to resolve the error
Uninstall the current package and restart the cluster. Install the package once again and try to run the code with the encrypted connection string as given below.
connectionString = "Endpoint=sb://{0}.servicebus.windows.net/;SharedAccessKeyName={1};SharedAccessKey={2};EntityPath=jgsevents".format(NAMESPACE_NAME, KEY_NAME, KEY_VALUE)
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
Output:
df.select(col("body").cast("string")).display()
body |
---|
[ { "key1": "value1", "key2": "value2", "key3": "value3", "nestedKey": { "nestedKey1": "nestedValue1" }, "arrayKey": [ "arrayValue1", "arrayValue2" ] }] |
For more information, refer to this.