I've been experimenting with Spark's mapPartitionsWithIndex and I ran into problems when trying to return an Iterator of a tuple that itself contained an empty iterator.
I tried several different ways of constructing the inner iterator [ via Iterator(), and List(...).iterator ], and all roads let to my getting this error:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 0.0 (TID 2) had a not serializable result: scala.collection.LinearSeqLike$$anon$1
Serialization stack:
- object not serializable (class: scala.collection.LinearSeqLike$$anon$1, value: empty iterator)
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (1,empty iterator))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 1)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
My code example is given below. Note that as given it runs OK (an empty iterator is returned as the mapPartitionsWithIndex value.) But when you run with the now commented-out version of the mapPartitionsWithIndex invocations you will get the error above.
If anyone has a suggestion on how to this can be made to work, I'd be much obliged.
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object ANonWorkingExample extends App {
val sparkConf = new SparkConf().setAppName("continuous").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val parallel: RDD[Int] = sc.parallelize(1 to 9)
val parts: Array[Partition] = parallel.partitions
val partRDD: RDD[(Int, Iterator[Int])] =
parallel.coalesce(3).
mapPartitionsWithIndex {
(partitionIndex: Int, inputiterator: Iterator[Int]) =>
val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
// Iterator((partitionIndex, mappedInput)) // FAILS
Iterator() // no exception.. but not really what i want.
}
val data = partRDD.collect
println("data:" + data.toList);
}
So, the dumb thing I was doing was trying to return an unserializable data structure: an Iterator, as clearly indicated by the stack trace I got.
And the solution is to not use an iterator. Rather, use a collection like a Seq, or List. The sample program below illustrates the correct way to do what I was trying to do.
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AWorkingExample extends App {
val sparkConf = new SparkConf().setAppName("batman").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val parallel: RDD[Int] = sc.parallelize(1 to 9)
val parts: Array[Partition] = parallel.partitions
val partRDD: RDD[(Int, List[Int])] =
parallel.coalesce(3).
mapPartitionsWithIndex {
(partitionIndex: Int, inputiterator: Iterator[Int]) =>
val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
Iterator((partitionIndex, mappedInput.toList)) // Note the .toList() call -- that makes it work
}
val data = partRDD.collect
println("data:" + data.toList);
}
By the way, what I was trying to do originally was to see concretely which chunks of data from my parallelized-to-RDD structure were assigned to which partition. Here is the output you get if you run the program:
data:List((0,List(2, 3)), (1,List(4, 5, 6)), (2,List(7, 8, 9, 10)))
Interesting that the data distribution could have been more optimally balanced, but wasn't. That's not the point of the question, but I thought it was interesting.