I want to limit the rate when fetching data from kafka. My code looks like:
df = spark.read.format('kafka') \
.option("kafka.bootstrap.servers",'...')\
.option("subscribe",'A') \
.option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
.option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
.option("maxOffsetsPerTrigger",20) \
.load() \
.cache()
However when I call df.count()
, the result is 600. What I expected is 20. Does anyone knows why "maxOffsetsPerTrigger" doesn't work.
You are bringing 200 records per each partition (0, 1, 2), the total is 600 records.
As you can see here:
Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.
This means that for each trigger or fetch process Kafka will get 20 records, but in total, you will still fetch the total records set in the configuration (200 per partition).