Search code examples
apache-sparkredisspark-structured-streaming

How to set maximum number of rows in one micro-batch?


I am reading batch record from redis using spark-structured-streaming foreachBatch by following code (trying to set the batchSize by stream.read.batch.size)

val data = spark.readStream.format("redis")
  .option("stream.read.batch.size").load()

val query = data.writeStream.foreachBatch { 
  (batchDF: DataFrame, batchId: Long) => ...
  // we count size of batchDF here, we want to limit its size
  // some operation
}

currently we set stream.read.batch.size to 128 but seems this does not work. The batchSize seems to be random, sometimes over 1000 even 10000.

However I do not want to wait for so long (10000 records) because I have some operations (in code comment // some operation) need to be done as soon as possible, so that I want to control the maximum batch size so when records reach this limitation it could be processed immediately, how to do it?


Solution

  • I am a maintainer of spark-redis. This is currently not supported. The stream.read.batch.size parameter controls the number of items read by a single Redis API call (count parameter of XREADGROUP call). It doesn't affect the number of items per trigger (batchDF size). I have opened a ticket on github for this feature request.