I am trying to write data into a Kafka topic after reading a hive table as below.
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_name')\
.save()
After successful write (number of records are 29000), I am reading data from the same topic as below in another file: read_kafka_data.py:
# SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])
# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
+ " oauth.client.id=" + '"' + "client_id" + '"' \
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'
stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', kafka_broker) \
.option('subscribe', 'topic_name') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.mechanism', 'OAUTHBEARER') \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
.option('startingOffsets', 'latest') \
.option('group.id', 'group_id') \
.option('maxOffsetsPerTrigger', 200) \
.option('fetchOffset.retryIntervalMs', 200) \
.option('fetchOffset.numRetries', 3) \
.load()\
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()
I am a beginner to Kafka and been reading Kafka performance optimisation techniques and came across these two.
spark.streaming.backpressure.enabled
andspark.streaming.kafka.maxRatePerPartition
To enable the first parameter:
sparkConf.set("spark.streaming.backpressure.enabled",”true”)
The explanation for the above parameter is given in the official documentation as:
Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
andspark.streaming.kafka.maxRatePerPartition
Now that I am running the application first time and there is no previous micro batch, should I specify some value for: spark.streaming.backpressure.initialRate
If so, how should I determine the value of spark.streaming.backpressure.initialRate
.
The documentation also says that if spark.streaming.backpressure.enabled
is set to true
the maximum receiving rate is set dynamically.
If that is the case, do we still need to configure:
spark.streaming.receiver.maxRate
and spark.streaming.kafka.maxRatePerPartition
if spark.streaming.backpressure.enabled
is set to true
?
This link says there is no impact in using spark.streaming.backpressure.initialRate
when back pressure is applied.
Any help in clearing the confusion would be much appreciated.
The configurations spark.streaming.[...]
you are referring to belong to the Direct Streaming (aka Spark Streaming) and not to Structured Streaming.
In case you are unaware of the difference, I recommend to have a look at the separate programming guides:
Structured Streaming does not provide a backpressure mechanism. As you are consuming from Kafka you can use (as you are already doing) the option maxOffsetsPerTrigger
to set a limit on read messages on each trigger. This option is documented in the Structured Streaming and Kafka Integration Guide as:
"Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume."
In case you are still interested in the title question
How is
spark.streaming.kafka.maxRatePerPartition
related tospark.streaming.backpressure.enabled
in case of spark streaming with Kafka?
This relation is explained in the documentation on Spark's Configuration:
"Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
andspark.streaming.kafka.maxRatePerPartition
if they are set (see below)."
All details on the backpressure mechanism available in Spark Streaming (DStream, not Structured Streaming) are explained in the blog that you have already linked Enable Back Pressure To Make Your Spark Streaming Application Production Ready.
Typically, if you enable backpressure you would set spark.streaming.kafka.maxRatePerPartition
to be 150% ~ 200% of the optimal estimated rate.
The exact calculation of the PID controller can be found in the code within the class PIDRateEstimator.
As you asked for an example, here is one that I have done in one of my productive applications:
spark.streaming.backpressure.enabled
set to truespark.streaming.kafka.maxRatePerPartition
set to 10000spark.streaming.backpressure.pid.minRate
kept at default value of 100