Search code examples
azuredatabricksazure-databricksdelta-lakedatabricks-autoloader

Azure Databricks is unable to create an Event Grid Subscription for Autoloader Streams


I am trying to create an autoloader stream in Azure Databricks. Now when I am trying to start the writeStream, I am presented with exception saying:

com.databricks.sql.cloudfiles.errors.CloudFilesException: Failed to create an Event Grid subscription. Please make sure that your service principal has 'write' permissions (e.g., assign it a Contributor role) on the storage account rahulstorageek in order to create Event Grid Subscriptions

enter image description here

Following is the code I am using:

# spark and cloudFiles configurations
spark.conf.set("fs.azure.account.key.<My ADLS Gen2 Storage account name>.blob.core.windows.net",
               "<Access Key 2 of my Storage Account>")
 
queuesas = "<SAS Connection String for Queue Storage>"
 
cloudfilesConf = {
    "cloudFiles.subscriptionId": "<Azure Free Trial Subscription Id>",
    "cloudFiles.connectionString": queuesas,
    "cloudFiles.format" : "csv",
    "cloudFiles.tenantId": "<Service Principals tenant Id>",
    "cloudFiles.clientId": "<Service Principals client Id>",
    "cloudFiles.clientSecret": "<Service Principals generated client secret Value>",
    "cloudFiles.resourceGroup" : "AzureDataBricks_Exploration_RG",
    "cloudFiles.useNotifications": "true",
    "cloudFiles.includeExistingFiles": "true",
    "cloudFiles.validateOptions": "true",
}

# Creating manual schema of incoming data
from pyspark.sql.functions import *
from pyspark.sql.types import *
 
dataset_schema = StructType([
    StructField("YearsExperience", DoubleType(), True),
    StructField("Salary", IntegerType(), True)])


# Autoloader ReadStream
autoloader_df = (spark.readStream.format("cloudFiles")
      .options(**cloudfilesConf)
      .option("recursiveFileLookup","true")
      .schema(dataset_schema)
      .load("/mnt/autoloadersource/csv_files/")
      )

# Autoloader Writestream
(autoloader_df.writeStream
 .format("delta")
 .option("mergeSchema", "true")
 .option("checkpointLocation", "/mnt/autoloadersink/autostream_chckpnt")
 .start("/mnt/autoloadersink/autoloader_dt01"))
## Exception is raised after executing this above command.

I have given following roles to the service principal I am using. enter image description here

Additionally the SAS token I generated for Queue were with the following parameters: enter image description here

I have tried giving all the additional roles to service principal you could see in the above screenshots but, still I am getting the same error.

Any solutions or suggestions would be highly valued.


Solution

  • Your permissions isn't enough because event grid needs to be created. The documentation clearly specify the necessary roles:

    • on storage account you need:

      • Contributor - will be used setting up resources in your storage account, such as queues and event subscriptions.
      • Storage Queue Data Contributor: will be used to perform queue operations such as retrieving and deleting messages from the queues. (May not be required if you use DBR 8.1+ and provide connection string).
    • On resource group:

      • EventGrid EventSubscription Contributor: will be used to perform event grid subscription operations.