Search code examples
apache-sparkapache-spark-sqlapache-spark-mllibscala-breeze

Spark parallel processing of grouped data


Initially, I had a lot of data. But using spark-SQL and especially groupBy it could be trimmed down to a manageable size. (fits in RAM of a single node)

How can I perform functions (in parallel) on all the groups (distributed among my nodes)?

How can I make sure that the data for a single group is collected to a single node? E.g. I will probably want to use local matrix for computation but do not want to run into errors regarding data locality.


Solution

  • Let's say you have x no. of executors(in your case probably 1 executor per node).And you want to partition the data on your keys in such a way that each key falls into a unique bucket which will be something like a perfect partitioner.There would be no generic way of doing that but it may be possible to achieve that if there is some inherent distribution/logic specific to your data.

    I had dealt with a specific case where I found that Spark's inbuilt hash partitioner was not doing a good job of distributing the keys uniformly.So I wrote a custom partitioner using Guava like this:

      class FooPartitioner(partitions: Int) extends org.apache.spark.HashPartitioner(partitions: Int) {
        override def getPartition(key: Any): Int = {
          val hasherer = Hashing.murmur3_32().newHasher()
          Hashing.consistentHash(
            key match {
              case i: Int => hasherer.putInt(i).hash.asInt()
              case _ => key.hashCode
              },PARTITION_SIZE)
      }
     }
    

    Then I added this partitioner instance as an argument to the combineBy that I was using so that resulting rdd is partitioned in this fashion. This does a good job of distributing data to x no of buckets but I guess there are no guarantees that each bucket will have only 1 key.

    In case you are on Spark 1.6 and using dataframes you can define a udf like this val hasher = udf((i:Int)=>Hashing.consistentHash(Hashing.murmur3_32().newHasher().putInt(i) .hash.asInt(),PARTITION_SIZE)) and do dataframe.repartition(hasher(keyThatYouAreUsing)) Hopefully this provides some hint to get started.