Search code examples
javascalaapache-sparkkryo

spark - How to reduce the shuffle size of a JavaPairRDD<Integer, Integer[]>?


I have a JavaPairRDD<Integer, Integer[]> on which I want to perform a groupByKey action.

The groupByKey action gives me a:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle

which is practically an OutOfMemory error, if I am not mistaken. This occurs only in big datasets (in my case when "Shuffle Write" shown in the Web UI is ~96GB).

I have set:

spark.serializer org.apache.spark.serializer.KryoSerializer

in $SPARK_HOME/conf/spark-defaults.conf, but I am not sure if Kryo is used to serialize my JavaPairRDD.

Is there something else that I should do to use Kryo, apart from setting this conf parameter, to serialize my RDD? I can see in the serialization instructions that:

Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.

and that:

Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.

I also noticed that when I set spark.serializer to be Kryo, the Shuffle Write in the Web UI increases from ~96GB (with default serializer) to 243GB!

EDIT: In a comment, I was asked about the logic of my program, in case groupByKey can be replaced with reduceByKey. I don't think it's possible, but here it is anyway:

  • Input has the form:

    • key: index bucket id,
    • value: Integer array of entity ids in this bucket
  • The shuffle write operation produces pairs in the form:

    • entityId
    • Integer array of all entity Ids in the same bucket (call them neighbors)
  • The groupByKey operation gathers all the neighbor arrays of each entity, some possibly appearing more than once (in many buckets).

  • After the groupByKey operation, I keep a weight for each bucket (based on the number of negative entity ids it contains) and for each neighbor id I sum up the weights of the buckets it belongs to.

  • I normalize the scores of each neighbor id with another value (let's say it's given) and emit the top-3 neighbors per entity.

The number of distinct keys that I get is around 10 million (around 5 million positive entity ids and 5 million negatives).

EDIT2: I tried using Hadoop's Writables (VIntWritable and VIntArrayWritable extending ArrayWritable) instead of Integer and Integer[], respectively, but the shuffle size was still bigger than the default JavaSerializer.

Then I increased the spark.shuffle.memoryFraction from 0.2 to 0.4 (even if deprecated in version 2.1.0, there is no description of what should be used instead) and enabled offHeap memory, and the shuffle size was reduced by ~20GB. Even if this does what the title asks, I would prefer a more algorithmic solution, or one that includes a better compression.


Solution

  • Short Answer: Use fastutil and maybe increase spark.shuffle.memoryFraction.

    More details: The problem with this RDD is that Java needs to store Object references, which consume much more space than primitive types. In this example, I need to store Integers, instead of int values. A Java Integer takes 16 bytes, while a primitive Java int takes 4 bytes. Scala's Int type, on the other hand, is a 32-bit (4-byte) type, just like Java's int, that's why people using Scala may not have faced something similar.

    Apart from increasing the spark.shuffle.memoryFraction to 0.4, another nice solution was to use the fastutil library, as suggest in Spark's tuning documentation:

    The first way to reduce memory consumption is to avoid the Java features that add overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this: Design your data structures to prefer arrays of objects, and primitive types, instead of the standard Java or Scala collection classes (e.g. HashMap). The fastutil library provides convenient collection classes for primitive types that are compatible with the Java standard library.

    This enables storing each element in int array of my RDD pair as an int type (i.e., using 4 bytes instead of 16 for each element of the array). In my case, I used IntArrayList instead of Integer[]. This made the shuffle size drop significantly and allowed my program to run in the cluster. I also used this library in other parts of the code, where I was making some temporary Map structures. Overall, by increasing spark.shuffle.memoryFraction to 0.4 and using fastutil library, shuffle size dropped from 96GB to 50GB (!) using the default Java serializer (not Kryo).

    Alternative: I have also tried sorting each int array of an rdd pair and storing the deltas using Hadoop's VIntArrayWritable type (smaller numbers use less space than bigger numbers), but this also required to register VIntWritable and VIntArrayWritable in Kryo, which didn't save any space after all. In general, I think that Kryo only makes things work faster, but does not decrease the space needed, but I am not still sure about that.

    I am not marking this answer as accepted yet, because someone else might have a better idea, and because I didn't use Kryo after all, as my OP was asking. I hope reading it, will help someone else with the same issue. I will update this answer, if I manage to further reduce the shuffle size.