I am using Spark Structured Streaming and am facing problems with mapPartitions
.
If we comment foreach
operation inside mapPartitions
it works just fine.
spark ui is showing job complete
stream
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.map { data => MetricDataFilter.filter_metric_data(data._1, data._2) }
.mapPartitions { partition =>
val lagged = partition.filter { x => x.metric_type == "n" }
lagged.foreach { x =>
val diff = Instant.now().toEpochMilli - x.timestamp.getTime
println(x.toString)
println(diff)
}
partition
}
.filter($"metric_type" === "c")
.withWatermark("timestamp", "5 minutes")
.groupBy(
window($"timestamp", "30 seconds", "30 seconds"),
$"metric_name", $"timestamp")
.sum("metric_value").as("sum_metric_value")
.writeStream
.queryName("countMetricQuery")
.outputMode(OutputMode.Update)
.format("console")
.start
removing foreach
in mapPartitions
works fine.
lagged.foreach {
x =>
val diff = Instant.now().toEpochMilli - x.timestamp.getTime
diff > 0 match {
case false =>
println(x.toString)
case true => println(x.toString)
}
println(diff)
}
It's a very tricky issue to spot and is due to how Iterator
works in Scala.
The following is the signature of mapPartitions
which says that you work with Iterator[T]
.
mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]
Returns a new Dataset that contains the result of applying func to each partition.
scala.Iterator
is a single-pass data structure so once all elements are consumed they're simply gone.
See yourself:
scala> val it = Seq(1,2,3).toIterator
it: Iterator[Int] = <iterator>
scala> it.foreach(println)
1
2
3
scala> it.foreach(println)
As you can see nothing's printed out in the second foreach
as no elements were left to consume.
In other words, your streaming query has no data left to process after mapPartitions
and hence no output.