I am trying to process the records from readstream and just try to print the row. How ever in my driver logs or executor logs cant see any printed statements. What might be wrong ?
val kafka = spark.readStream
.format("kafka")
.option("maxOffsetsPerTrigger", MAX_OFFSETS_PER_TRIGGER)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("subscribe", topic) // comma separated list of topics
.option("startingOffsets", "earliest")
.option("checkpointLocation", CHECKPOINT_LOCATION)
.option("failOnDataLoss", "false")
.option("minPartitions", sys.env.getOrElse("MIN_PARTITIONS", "64").toInt)
.load()
import spark.implicits._
println("JSON output to write into sink")
val consoleOutput = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
//.select(from_json($"json", schema) as "data")
//.select("data.*")
//.select(get_json_object(($"value").cast("string"), "$").alias("body"))
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(row: Row): Unit = {
logger.info(
s"Record received in data frame is -> " + row.mkString )
runProcess() // Want to run some process every microbatch
}
override def close(errorOrNull: Throwable): Unit = {}
})
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
consoleOutput.awaitTermination()
}
I copied your code and it is running fine without the runProcess
function call.
If you are planning to do two different things I recommend to have two separate queries after selecting the relevant fields from Kafka topic:
val kafkaSelection = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
val query1 = kafkaSelection
.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.option("checkpointLocation", CHECKPOINT_LOCATION1)
.start()
val query2 = kafkaSelection
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(row: Row): Unit = {
logger.info(
s"Record received in data frame is -> " + row.mkString )
runProcess() // Want to run some process every microbatch
}
override def close(errorOrNull: Throwable): Unit = {}
})
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_LOCATION2)
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
Also note that I have set the checkpoint location for each query individually which will ensure a consistent tracking of the Kafka offsets. Make sure to have two different checkpoint location for each query. You can run both queries in parallel.
It is important to define both queries before waiting for their termination:
query1.awaitTermination()
query2.awaitTermination()
Tested with Spark 2.4.5: