Search code examples
apache-sparkiteratorpartitionserializable

wondering why empty inner iterator causes not serializable exception with mapPartitionsWithIndex


I've been experimenting with Spark's mapPartitionsWithIndex and I ran into problems when trying to return an Iterator of a tuple that itself contained an empty iterator.

I tried several different ways of constructing the inner iterator [ via Iterator(), and List(...).iterator ], and all roads let to my getting this error:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 0.0 (TID 2) had a not serializable result: scala.collection.LinearSeqLike$$anon$1
Serialization stack:
        - object not serializable (class: scala.collection.LinearSeqLike$$anon$1, value: empty iterator)
        - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
        - object (class scala.Tuple2, (1,empty iterator))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 1)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)

My code example is given below. Note that as given it runs OK (an empty iterator is returned as the mapPartitionsWithIndex value.) But when you run with the now commented-out version of the mapPartitionsWithIndex invocations you will get the error above.

If anyone has a suggestion on how to this can be made to work, I'd be much obliged.

import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ANonWorkingExample extends App {
  val sparkConf = new SparkConf().setAppName("continuous").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)
  val parallel: RDD[Int] = sc.parallelize(1 to 9)
  val parts: Array[Partition] = parallel.partitions

  val partRDD: RDD[(Int, Iterator[Int])] =
    parallel.coalesce(3).
      mapPartitionsWithIndex {
        (partitionIndex: Int, inputiterator: Iterator[Int]) =>
          val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
          // Iterator((partitionIndex, mappedInput)) // FAILS
          Iterator()   // no exception.. but not really what i want.

      }

  val data = partRDD.collect
  println("data:" + data.toList);
}

Solution

  • So, the dumb thing I was doing was trying to return an unserializable data structure: an Iterator, as clearly indicated by the stack trace I got.

    And the solution is to not use an iterator. Rather, use a collection like a Seq, or List. The sample program below illustrates the correct way to do what I was trying to do.

    import org.apache.spark.{Partition, SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object AWorkingExample extends App {
      val sparkConf = new SparkConf().setAppName("batman").setMaster("local[*]")
      val sc = new SparkContext(sparkConf)
      val parallel: RDD[Int] = sc.parallelize(1 to 9)
      val parts: Array[Partition] = parallel.partitions
    
      val partRDD: RDD[(Int, List[Int])] =
        parallel.coalesce(3).
          mapPartitionsWithIndex {
            (partitionIndex: Int, inputiterator: Iterator[Int]) =>
              val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
              Iterator((partitionIndex, mappedInput.toList)) // Note the .toList() call -- that makes it work
          }
    
      val data = partRDD.collect
      println("data:" + data.toList);
    }
    

    By the way, what I was trying to do originally was to see concretely which chunks of data from my parallelized-to-RDD structure were assigned to which partition. Here is the output you get if you run the program:

    data:List((0,List(2, 3)), (1,List(4, 5, 6)), (2,List(7, 8, 9, 10)))

    Interesting that the data distribution could have been more optimally balanced, but wasn't. That's not the point of the question, but I thought it was interesting.