Search code examples
azureazure-databricksazure-eventhub

Unable instantiate EventHubsSourceProvider in Azure databricks


I am trying to read the data from event hub from Azure Databricks using the following code.

from pyspark.sql.functions import *
from pyspark.sql.types import *

NAMESPACE_NAME = "*myEventHub*"
KEY_NAME = "*MyPolicyName*"
KEY_VALUE = "*MySharedAccessKey*"

connectionString = "Endpoint=sb://{0}.servicebus.windows.net/;SharedAccessKeyName={1};SharedAccessKey={2};EntityPath=ingestion".format(NAMESPACE_NAME, KEY_NAME, KEY_VALUE)

ehConf = {}
ehConf['eventhubs.connectionString'] = connectionString

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

It's throwing this error

java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.streaming.eventhubs.EventHubsSourceProvider could not be instantiated

I am using Databricks runtime version 14.2 (Scala 2.12, Spark 3.5.0) and installed com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22 package. Please can someone help to resolve the error


Solution

  • Uninstall the current package and restart the cluster. Install the package once again and try to run the code with the encrypted connection string as given below.

    connectionString = "Endpoint=sb://{0}.servicebus.windows.net/;SharedAccessKeyName={1};SharedAccessKey={2};EntityPath=jgsevents".format(NAMESPACE_NAME, KEY_NAME, KEY_VALUE)
    
    ehConf = {}
    ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
    

    Output:

    df.select(col("body").cast("string")).display()
    
    body
    [ { "key1": "value1", "key2": "value2", "key3": "value3", "nestedKey": { "nestedKey1": "nestedValue1" }, "arrayKey": [ "arrayValue1", "arrayValue2" ] }]

    For more information, refer to this.