Search code examples
scalaapache-sparkspark-structured-streaming

Why no output after Dataset.mapPartitions (stopping data in downstream stream processing)?


I am using Spark Structured Streaming and am facing problems with mapPartitions.

If we comment foreach operation inside mapPartitions it works just fine.

spark ui is showing job complete

enter image description here

stream
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .map { data => MetricDataFilter.filter_metric_data(data._1, data._2) }
  .mapPartitions { partition =>
    val lagged = partition.filter { x => x.metric_type == "n" }
    lagged.foreach { x =>
      val diff = Instant.now().toEpochMilli - x.timestamp.getTime
      println(x.toString)
      println(diff)
    }
    partition
  }
  .filter($"metric_type" === "c")
  .withWatermark("timestamp", "5 minutes")
  .groupBy(
    window($"timestamp", "30 seconds", "30 seconds"),
    $"metric_name", $"timestamp")
  .sum("metric_value").as("sum_metric_value")
  .writeStream
  .queryName("countMetricQuery")
  .outputMode(OutputMode.Update)
  .format("console")
  .start

removing foreach in mapPartitions works fine.

lagged.foreach {
              x =>
                val diff = Instant.now().toEpochMilli - x.timestamp.getTime
                diff > 0 match {
                  case false =>
                    println(x.toString)
                  case true => println(x.toString)
                }
                println(diff)

            }

Solution

  • It's a very tricky issue to spot and is due to how Iterator works in Scala.

    The following is the signature of mapPartitions which says that you work with Iterator[T].

    mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]

    Returns a new Dataset that contains the result of applying func to each partition.

    scala.Iterator is a single-pass data structure so once all elements are consumed they're simply gone.

    See yourself:

    scala> val it = Seq(1,2,3).toIterator
    it: Iterator[Int] = <iterator>
    
    scala> it.foreach(println)
    1
    2
    3
    
    scala> it.foreach(println)
    

    As you can see nothing's printed out in the second foreach as no elements were left to consume.

    In other words, your streaming query has no data left to process after mapPartitions and hence no output.