Search code examples
azure-databricksazure-eventhub

Event Hub Throttling with the error: request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102


I am using Databricks Labs Data Generator to send synthetic data to Event Hub.

Everything appears to be working fine for a about two minutes but then the streaming stops and provides the following error:

The request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102.

Can someone let me know how to adjust the throttling.

The code I'm using to send data to Event Hub is as follows:

delay_reasons = ["Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft"]


flightdata_defn = (dg.DataGenerator(spark, name="flight_delay_data", rows=num_rows, partitions=num_partitions)
                 #.withColumn("body",StringType(), False)
                 .withColumn("flightNumber", "int", minValue=1000, uniqueValues=10000, random=True)
                 .withColumn("airline", "string", minValue=1, maxValue=500,  prefix="airline", random=True, distribution="normal")
                 .withColumn("original_departure", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
                 .withColumn("delay_minutes", "int", minValue=20, maxValue=600, distribution=dg.distributions.Gamma(1.0, 2.0))
                 .withColumn("delayed_departure",  "timestamp", expr="cast(original_departure as bigint) +  (delay_minutes * 60) ", baseColumn=["original_departure", "delay_minutes"])
                 .withColumn("reason", "string", values=delay_reasons, random=True)
                )

df_flight_data = flightdata_defn.build(withStreaming=True, options={'rowsPerSecond': 100})


streamingDelays = (
  df_flight_data
    .groupBy(
      #df_flight_data.body,
      df_flight_data.flightNumber,
      df_flight_data.airline,
      df_flight_data.original_departure,
      df_flight_data.delay_minutes,
      df_flight_data.delayed_departure,
      df_flight_data.reason,
      window(df_flight_data.original_departure, "1 hour")
    )
    .count()
)

writeConnectionString = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
checkpointLocation = "///checkpoint"

# ehWriteConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
# ehWriteConf = {
#   'eventhubs.connectionString' : writeConnectionString
# }

ehWriteConf = {
  'eventhubs.connectionString' : writeConnectionString
}

Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.

ds = streamingDelays \
  .select(F.to_json(F.struct("*")).alias("body")) \
  .writeStream.format("eventhubs") \
  .options(**ehWriteConf) \
  .outputMode("complete") \
  .option("checkpointLocation", "...") \
  .start()

I forgot to mention that I have 1 TU

enter image description here


Solution

  • This is due to usual traffic throttling from Event Hubs, take a look at the limits for 1 TU https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas, you can increase the number of TUs to 2 and then go from there. If you think this is unexpected throttling then open a support ticket for the issue.