Initially, I had a lot of data. But using spark-SQL and especially groupBy it could be trimmed down to a manageable size. (fits in RAM of a single node)
How can I perform functions (in parallel) on all the groups (distributed among my nodes)?
How can I make sure that the data for a single group is collected to a single node? E.g. I will probably want to use local matrix
for computation but do not want to run into errors regarding data locality.
Let's say you have x no. of executors(in your case probably 1 executor per node).And you want to partition the data on your keys in such a way that each key falls into a unique bucket which will be something like a perfect partitioner.There would be no generic way of doing that but it may be possible to achieve that if there is some inherent distribution/logic specific to your data.
I had dealt with a specific case where I found that Spark's inbuilt hash partitioner was not doing a good job of distributing the keys uniformly.So I wrote a custom partitioner using Guava like this: class FooPartitioner(partitions: Int) extends org.apache.spark.HashPartitioner(partitions: Int) {
override def getPartition(key: Any): Int = {
val hasherer = Hashing.murmur3_32().newHasher()
Hashing.consistentHash(
key match {
case i: Int => hasherer.putInt(i).hash.asInt()
case _ => key.hashCode
},PARTITION_SIZE)
}
}
Then I added this partitioner instance as an argument to the combineBy that I was using so that resulting rdd is partitioned in this fashion. This does a good job of distributing data to x no of buckets but I guess there are no guarantees that each bucket will have only 1 key.
In case you are on Spark 1.6 and using dataframes you can define a udf like this
val hasher = udf((i:Int)=>Hashing.consistentHash(Hashing.murmur3_32().newHasher().putInt(i) .hash.asInt(),PARTITION_SIZE))
and do dataframe.repartition(hasher(keyThatYouAreUsing))
Hopefully this provides some hint to get started.