Search code examples
apache-sparkpysparkdatabricksazure-eventhub

Spark Streaming AvailableNow trigger for Azure Event Hub?


I'm trying to use spark streaming with availableNow trigger to ingest data from an Azure Event Hub into a Delta Lake table in Databricks.

My code:

conn_str = "my conn string"
ehConf = {
  "eventhubs.connectionString": 
    spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(conn_str),
  "eventhubs.consumerGroup":
    "my-consumer-grp",
}

read_stream = spark.readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

stream = read_stream.writeStream \
  .format("delta") \
  .option("checkpointLocation", checkpoint_location) \
  .trigger(availableNow=True) \
  .toTable(full_table_name, mode="append")

According to the documentation https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers The availableNow trigger should process all data currently available, in a micro-batch style.

However, This is not happening, instead, it processes only 1000 rows. The output of the stream tells the story:

{
"sources" : [ {
    "description" : "org.apache.spark.sql.eventhubs.EventHubsSource@2c5bba32",
    "startOffset" : {
      "my-hub-name" : {
        "0" : 114198857
      }
    },
    "endOffset" : {
      "my-hub-name" : {
        "0" : 119649573
      }
    },
    "latestOffset" : {
      "my-hub-name" : {
        "0" : 119650573
      }
    },
    "numInputRows" : 1000,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 36.1755236407047
  } ]
}

We can clearly see the offset changes by way more than the 1000 processed.
I have verified the content of the target table, it contains the last 1000 offsets. \

According to the Event Hub configuration for Pyspark https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration
The maxEventsPerTrigger is set to 1000*partitionCount by default, this should only affect how many events are processed per batch though, and not the total amount of records processed by availableNow trigger.

Running the same query with the trigger being once=True will instead ingest all of the events (assuming batch size is set large enough).

Is seems like the availableNow trigger is not working as intended for Azure Event Hub.


Solution

  • The 'avaiableNow' trigger seems to not be implemented yet in 'azure-event-hub-spark' package.

    But there is a workaround possible using Kafka connector to Azure Event Hub - https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/spark

    So essentially the previous code becomes

    bootstrap_servers = "my-evh-namespace.servicebus.windows.net:9093"
    eventhub_endpoint = "my-evh-namespace-endpoint"
    
    # The 'kafkashaded' part here is because it's running in Databricks.
    # Otherwise drop that part.
    EH_SASL = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{eventhub_endpoint}\";"
    
    topic = "my-eventhub-name"
    
    read_stream = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.jaas.config", EH_SASL) \
        .option("subscribe", topic) \
        .option("maxOffsetsPerTrigger", 1000) \
        .option("startingOffsets", "earliest") \
        .option("includeHeaders", "true") \
        .load()
    
    # Notice that the output writeStream remains the same.
    stream = read_stream.writeStream \
      .format("delta") \
      .option("checkpointLocation", checkpoint_location) \
      .trigger(availableNow=True) \
      .toTable(full_table_name, mode="append")
    

    This results in the stream performing as expected - ingesting all events up until start time in batches of size maxOffsetsPerTrigger