I am using Databricks Labs Data Generator to send synthetic data to Event Hub.
Everything appears to be working fine for a about two minutes but then the streaming stops and provides the following error:
The request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102.
Can someone let me know how to adjust the throttling.
The code I'm using to send data to Event Hub is as follows:
delay_reasons = ["Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft"]
flightdata_defn = (dg.DataGenerator(spark, name="flight_delay_data", rows=num_rows, partitions=num_partitions)
#.withColumn("body",StringType(), False)
.withColumn("flightNumber", "int", minValue=1000, uniqueValues=10000, random=True)
.withColumn("airline", "string", minValue=1, maxValue=500, prefix="airline", random=True, distribution="normal")
.withColumn("original_departure", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
.withColumn("delay_minutes", "int", minValue=20, maxValue=600, distribution=dg.distributions.Gamma(1.0, 2.0))
.withColumn("delayed_departure", "timestamp", expr="cast(original_departure as bigint) + (delay_minutes * 60) ", baseColumn=["original_departure", "delay_minutes"])
.withColumn("reason", "string", values=delay_reasons, random=True)
)
df_flight_data = flightdata_defn.build(withStreaming=True, options={'rowsPerSecond': 100})
streamingDelays = (
df_flight_data
.groupBy(
#df_flight_data.body,
df_flight_data.flightNumber,
df_flight_data.airline,
df_flight_data.original_departure,
df_flight_data.delay_minutes,
df_flight_data.delayed_departure,
df_flight_data.reason,
window(df_flight_data.original_departure, "1 hour")
)
.count()
)
writeConnectionString = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
checkpointLocation = "///checkpoint"
# ehWriteConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
# ehWriteConf = {
# 'eventhubs.connectionString' : writeConnectionString
# }
ehWriteConf = {
'eventhubs.connectionString' : writeConnectionString
}
ds = streamingDelays \
.select(F.to_json(F.struct("*")).alias("body")) \
.writeStream.format("eventhubs") \
.options(**ehWriteConf) \
.outputMode("complete") \
.option("checkpointLocation", "...") \
.start()
I forgot to mention that I have 1 TU
This is due to usual traffic throttling from Event Hubs, take a look at the limits for 1 TU https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas, you can increase the number of TUs to 2 and then go from there. If you think this is unexpected throttling then open a support ticket for the issue.