I am reading batch record from redis using spark-structured-streaming foreachBatch
by following code (trying to set the batchSize by stream.read.batch.size
)
val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()
val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}
currently we set stream.read.batch.size
to 128 but seems this does not work. The batchSize seems to be random, sometimes over 1000 even 10000.
However I do not want to wait for so long (10000 records) because I have some operations (in code comment // some operation
) need to be done as soon as possible, so that I want to control the maximum batch size so when records reach this limitation it could be processed immediately, how to do it?
I am a maintainer of spark-redis. This is currently not supported. The stream.read.batch.size
parameter controls the number of items read by a single Redis API call (count
parameter of XREADGROUP call). It doesn't affect the number of items per trigger (batchDF size). I have opened a ticket on github for this feature request.