Search code examples
apache-sparkpysparkapache-kafkaspark-streamingspark-structured-streaming

How is spark.streaming.kafka.maxRatePerPartition related to spark.streaming.backpressure.enabled incase of spark streaming with Kafka?


I am trying to write data into a Kafka topic after reading a hive table as below.

write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))

final_df.write.format("kafka")\
        .option("kafka.bootstrap.servers", kafka_broker)\
        .option("kafka.batch.size", 51200)\
        .option("retries", 3)\
        .option("kafka.max.request.size", 500000)\
        .option("kafka.max.block.ms", 120000)\
        .option("kafka.metadata.max.age.ms", 120000)\
        .option("kafka.request.timeout.ms", 120000)\
        .option("kafka.linger.ms", 0)\
        .option("kafka.delivery.timeout.ms", 130000)\
        .option("acks", "1")\
        .option("kafka.compression.type", "snappy")\
        .option("kafka.security.protocol", "SASL_SSL")\
        .option("kafka.sasl.jaas.config", oauth_config)\
        .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
        .option("kafka.sasl.mechanism", "OAUTHBEARER")\
        .option("topic", 'topic_name')\
        .save()

After successful write (number of records are 29000), I am reading data from the same topic as below in another file: read_kafka_data.py:

    # SCHEMA
    schema = StructType([StructField("col1", StringType()),
            StructField("col2", IntegerType())
    ])

    # READ FROM TOPIC
    jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
                          + " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
                          + " oauth.client.id=" + '"' + "client_id" + '"' \
                          + " oauth.client.secret=" + '"' + "secret_key" + '" ;'

    stream_df = spark.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', kafka_broker) \
            .option('subscribe', 'topic_name') \
            .option('kafka.security.protocol', 'SASL_SSL') \
            .option('kafka.sasl.mechanism', 'OAUTHBEARER') \
            .option('kafka.sasl.jaas.config', jass_config) \
            .option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
            .option('startingOffsets', 'latest') \
            .option('group.id', 'group_id') \
            .option('maxOffsetsPerTrigger', 200) \
            .option('fetchOffset.retryIntervalMs', 200) \
            .option('fetchOffset.numRetries', 3) \
            .load()\
            .select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')

    stream_df.writeStream.outputMode('append')
    .format(HiveWarehouseSession.STREAM_TO_STREAM)
      .option("database", "database_name")
      .option("table", "table_name")
      .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
      .option("checkpointLocation", "/path/to/checkpoint/dir")
      .start().awaitTermination()

I am a beginner to Kafka and been reading Kafka performance optimisation techniques and came across these two.

spark.streaming.backpressure.enabled and spark.streaming.kafka.maxRatePerPartition

To enable the first parameter:

sparkConf.set("spark.streaming.backpressure.enabled",”true”)

The explanation for the above parameter is given in the official documentation as:

Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition

Now that I am running the application first time and there is no previous micro batch, should I specify some value for: spark.streaming.backpressure.initialRate

If so, how should I determine the value of spark.streaming.backpressure.initialRate. The documentation also says that if spark.streaming.backpressure.enabled is set to true the maximum receiving rate is set dynamically. If that is the case, do we still need to configure: spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if spark.streaming.backpressure.enabled is set to true ?

This link says there is no impact in using spark.streaming.backpressure.initialRate when back pressure is applied.

Any help in clearing the confusion would be much appreciated.


Solution

  • The configurations spark.streaming.[...] you are referring to belong to the Direct Streaming (aka Spark Streaming) and not to Structured Streaming.

    In case you are unaware of the difference, I recommend to have a look at the separate programming guides:

    Structured Streaming does not provide a backpressure mechanism. As you are consuming from Kafka you can use (as you are already doing) the option maxOffsetsPerTrigger to set a limit on read messages on each trigger. This option is documented in the Structured Streaming and Kafka Integration Guide as:

    "Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume."


    In case you are still interested in the title question

    How is spark.streaming.kafka.maxRatePerPartition related to spark.streaming.backpressure.enabled in case of spark streaming with Kafka?

    This relation is explained in the documentation on Spark's Configuration:

    "Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below)."

    All details on the backpressure mechanism available in Spark Streaming (DStream, not Structured Streaming) are explained in the blog that you have already linked Enable Back Pressure To Make Your Spark Streaming Application Production Ready.

    Typically, if you enable backpressure you would set spark.streaming.kafka.maxRatePerPartition to be 150% ~ 200% of the optimal estimated rate.

    The exact calculation of the PID controller can be found in the code within the class PIDRateEstimator.

    Backpressure Example with Spark Streaming

    As you asked for an example, here is one that I have done in one of my productive applications:

    Set-Up

    • Kafka topic has 16 partitions
    • Spark runs with 16 worker cores, so each partitions can be consumed in parallel
    • Using Spark Streaming (not Structured Streaming)
    • Batch interval is 10 seconds
    • spark.streaming.backpressure.enabled set to true
    • spark.streaming.kafka.maxRatePerPartition set to 10000
    • spark.streaming.backpressure.pid.minRate kept at default value of 100
    • The job can handle around 5000 messages per second per partition
    • Kafka topic contains multiple millions of messages in each partitions before starting the streaming job

    Observation

    • In the very first batch the streaming job fetches 16000 (= 10 seconds * 16 partitions * 100 pid.minRate) messages.
    • The job is processing these 16000 message quite fast, so the PID controller estimates an optimal rate of something larger than the maxRatePerPartition of 10000.
    • Therefore, in the second batch, the streaming job fetches 1600000 (= 10 seconds * 16 partitions * 10000 maxRatePerPartition) messages.
    • Now, it takes around 22 seconds for the second batch to finish
    • Because our batch interval was set to 10 seconds, after 10 seconds the streaming job schedules already the third micro-batch with again 1600000. The reason is that the PID controller can only use performance information from finished micro-batches.
    • Only in the sixth or seventh micro-batch the PID controller finds the optimal processing rate of around 5000 messages per second per partition.