Search code examples
scalaazureapache-sparkapache-kafkaspark-structured-streaming

Control micro batch of Structured Spark Streaming


I'm reading data from a Kafka topic and I'm putting it to Azure ADLS (HDFS Like) in partitioned mode.

My code is like below :

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("failOnDataLoss", false)
      .load()
      .selectExpr(/*"CAST(key AS STRING)",*/ "CAST(value AS STRING)").as(Encoders.STRING)
df.writeStream
      .partitionBy("year", "month", "day", "hour", "minute")
      .format("parquet")
      .option("path", outputDirectory)
      .option("checkpointLocation", checkpointDirectory)
      .outputMode("append")
      .start()
      .awaitTermination()

I have about 2000 records/sec, and my problem is that Spark is inserting the data every 45sec, and I want the data to be inserted immediately.

Anyone know how to control the size of micro batch ?


Solution

  • From the Spark 2.3 version it is available the Continuous processing mode. In the official doc. you can read that only three sinks are supported for this mode and only the Kafka sink is ready for production, and "the end-to-end low-latency processing can be best observed with Kafka as the source and sink"

    df
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("checkpointLocation", "/tmp/0")
    .option("topic", "output0")
    .trigger(Trigger.Continuous("0 seconds"))
    .start()
    

    So, it seems that, at the moment, you can´t use HDFS as sink using the Continuous mode. In your case maybe you can test Akka Streams and the Alpakka connector