Search code examples
apache-sparkdatabricksspark-structured-streaming

Spark Structured Streaming not able to see the record details


I am trying to process the records from readstream and just try to print the row. How ever in my driver logs or executor logs cant see any printed statements. What might be wrong ?

  1. For every record or batch( ideally) i want to print the message
  2. for every batch , i want to execute a process.
val kafka = spark.readStream
    .format("kafka")
    .option("maxOffsetsPerTrigger", MAX_OFFSETS_PER_TRIGGER)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) 
    .option("subscribe", topic) // comma separated list of topics
    .option("startingOffsets", "earliest")
    .option("checkpointLocation", CHECKPOINT_LOCATION)
    .option("failOnDataLoss", "false")
    .option("minPartitions", sys.env.getOrElse("MIN_PARTITIONS", "64").toInt)
    .load()


  import spark.implicits._

 


  println("JSON output to write into sink")

  val consoleOutput = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
    //.select(from_json($"json", schema) as "data")
    //.select("data.*")
    //.select(get_json_object(($"value").cast("string"), "$").alias("body"))
    .writeStream
    .foreach(new ForeachWriter[Row] {
      override def open(partitionId: Long, epochId: Long): Boolean = true

      override def process(row: Row): Unit = {
        logger.info(
          s"Record received in data frame is -> " + row.mkString )
          runProcess() // Want to run some process every microbatch

      }

      override def close(errorOrNull: Throwable): Unit = {}


    })
    .outputMode("append")
    .format("console")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()


  consoleOutput.awaitTermination()

}

Solution

  • I copied your code and it is running fine without the runProcess function call.

    If you are planning to do two different things I recommend to have two separate queries after selecting the relevant fields from Kafka topic:

    val kafkaSelection = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
    

    1. For every record or batch( ideally) i want to print the message

    val query1 = kafkaSelection
      .writeStream
      .outputMode("append")
      .format("console")
      .trigger(Trigger.ProcessingTime("30 seconds"))
      .option("checkpointLocation", CHECKPOINT_LOCATION1)
      .start()
    

    2. for every batch , i want to execute a process.

    val query2 = kafkaSelection
      .writeStream
      .foreach(new ForeachWriter[Row] {
          override def open(partitionId: Long, epochId: Long): Boolean = true
    
          override def process(row: Row): Unit = {
            logger.info(
              s"Record received in data frame is -> " + row.mkString )
              runProcess() // Want to run some process every microbatch
    
          }
    
          override def close(errorOrNull: Throwable): Unit = {}
    
        })
      .outputMode("append")
      .option("checkpointLocation", CHECKPOINT_LOCATION2)
      .trigger(Trigger.ProcessingTime("30 seconds"))
      .start()
    
    
    

    Also note that I have set the checkpoint location for each query individually which will ensure a consistent tracking of the Kafka offsets. Make sure to have two different checkpoint location for each query. You can run both queries in parallel.

    It is important to define both queries before waiting for their termination:

    query1.awaitTermination()
    query2.awaitTermination()
    

    Tested with Spark 2.4.5:

    enter image description here