Search code examples
apache-kafkaspark-structured-streaming

Efficient reading from Kafka with Spark Streaming


I have an app, which gets data from Kafka and save it to the database. My code looks like this:

spark.readStream
  .format("kafka")
  .options(options)
  .load()
  .writeStream
  .trigger(Trigger.ProcessingTime(20000))
  .foreachBatch({ (batch: DataFrame, _: Long) =>
    val rowsCount = batch.count
    saveBatch(batch)
    println(s"Saved $rowsCount rows")
  })
  .start()

In Spark UI I look into the Structure Streaming tab and see, that process rate of my stream is 100K rows per second. If I remove rows counting like this:

.foreachBatch({ (batch: DataFrame, _: Long) =>
    saveBatch(batch)
  })
  .start()

process rate becomes 50K per second. As you can see, in first case I don't using caching, and when I calculating count of rows in batch and saving batch to database, I probably read data from Kafka twice (instead of second case, when reading is only one time) I can't belive, that such trivial operation, like rows counting is leading to additional reading from kafka, is it any approaches, to get batch length without caching data?


Solution

  • @OneCricketeer you right, Spark doesn't read from Kafka multiple times. I checked DAG scheme and there is only one "MicroBatchScan" stage in both cases. In addition I compared network utilization on Kafka hosts, and there isn't any differance too (output is the same). It looks like "process rate" in Structure Streaming tab means exact count of processed rows, during batch processing, but it doesn't affect reading from Kafka