I'm trying to use spark streaming with availableNow
trigger to ingest data from an Azure Event Hub into a Delta Lake table in Databricks.
My code:
conn_str = "my conn string"
ehConf = {
"eventhubs.connectionString":
spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(conn_str),
"eventhubs.consumerGroup":
"my-consumer-grp",
}
read_stream = spark.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
stream = read_stream.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_location) \
.trigger(availableNow=True) \
.toTable(full_table_name, mode="append")
According to the documentation https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
The availableNow
trigger should process all data currently available, in a micro-batch style.
However, This is not happening, instead, it processes only 1000 rows. The output of the stream tells the story:
{
"sources" : [ {
"description" : "org.apache.spark.sql.eventhubs.EventHubsSource@2c5bba32",
"startOffset" : {
"my-hub-name" : {
"0" : 114198857
}
},
"endOffset" : {
"my-hub-name" : {
"0" : 119649573
}
},
"latestOffset" : {
"my-hub-name" : {
"0" : 119650573
}
},
"numInputRows" : 1000,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 36.1755236407047
} ]
}
We can clearly see the offset changes by way more than the 1000 processed.
I have verified the content of the target table, it contains the last 1000 offsets. \
According to the Event Hub configuration for Pyspark https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration
The maxEventsPerTrigger
is set to 1000*partitionCount
by default, this should only affect how many events are processed per batch though, and not the total amount of records processed by availableNow
trigger.
Running the same query with the trigger being once=True
will instead ingest all of the events (assuming batch size is set large enough).
Is seems like the availableNow
trigger is not working as intended for Azure Event Hub.
The 'avaiableNow' trigger seems to not be implemented yet in 'azure-event-hub-spark' package.
But there is a workaround possible using Kafka connector to Azure Event Hub - https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/spark
So essentially the previous code becomes
bootstrap_servers = "my-evh-namespace.servicebus.windows.net:9093"
eventhub_endpoint = "my-evh-namespace-endpoint"
# The 'kafkashaded' part here is because it's running in Databricks.
# Otherwise drop that part.
EH_SASL = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{eventhub_endpoint}\";"
topic = "my-eventhub-name"
read_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("subscribe", topic) \
.option("maxOffsetsPerTrigger", 1000) \
.option("startingOffsets", "earliest") \
.option("includeHeaders", "true") \
.load()
# Notice that the output writeStream remains the same.
stream = read_stream.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_location) \
.trigger(availableNow=True) \
.toTable(full_table_name, mode="append")
This results in the stream performing as expected - ingesting all events up until start time in batches of size maxOffsetsPerTrigger