Search code examples
pysparkspark-structured-streamingcheckpointbatchsize

How can I set the micro batch size in Spark Structured Streaming from Kafka topic?


I have a Spark Structured Streaming app that reads from Kafka and writes to Elasticsearch and S3. I have enabled checkpointing to a S3 bucket as well (app runs AWS EMR). I saw that in S3 bucket that over time the commits get less frequently and there is always growing delay in the data.

So I want to make Spark to process always to process batches with same amount of data each batch. I tried to set the ".option("maxOffsetsPerTrigger", 100)" but the batch size didnt become smaller, still huge amount of time between commits. As I understood that we just tell spark how much data consume from kafka per poll and that spark just polls multiple times and then writes, so no limitations in the batch size.

I also tried to use continuous mode but the submit failed, i guess cuz of the output sink / foreachbatch doesnt support it.

any ideas are welcome, i will try everything ^^


Solution

  • actually the each offset contained so much data that I had to limit the max offsets per trigger to 50, and had to delete the old checkpoint folder, I read somewhere that it tries to finish first batch with the offset in the checkpoint, and then turns on the max offset per trigger