Search code examples
apache-sparkrddshufflehadoop-partitioning

avoid partitions unbalancing Spark


I have a performance problem with a code I'm revisioning, everytime will give an OOM while performing a count. I think I found the problem, basically after keyBy tranformation, being executed aggregateByKey. The problem lies to the fact that almost 98% of the RDD elements has the same key, so aggregationByKey, generate shuffle, putting nearly all records into the same partition, bottom line: just few executors works, and has to much memory pressure.

This is the code:

val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
    .keyBy(po => po.getProcessCreator.name)
    .aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
    .map {case(name,list) =>
      val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
      val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
      lastOfGroupByKeys.flatMap(f => f._2)
    }
    .flatMap(f => f)

log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)

I would a way to perform operation in a more parallel way, allowing all executors to work nearly equally. How can I do that?

Should I have to use a custom partitioner?


Solution

  • If your observation is correct and

    98% of the RDD elements has the same key

    then change of partitioner won't help you at all. By the definition of the partitioner 98% of the data will have to be processed by a single executor.

    Luckily bad code is probably the bigger problem here than the skew. Skipping over:

    .aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
    

    which is just a folk magic it looks like the whole pipeline can be rewritten as a simple reuceByKey. Pseudocode:

    • Combine name and local keys into a single key:

      def key(po: AnomalyPO) = (
        // "major" key
        po.getProcessCreator.name, 
        // "minor" key
        po.getPodId, po.getAnomalyCode,
        po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID
      )
      

      Key containing name, date and additional fields should have much higher cardinality than the name alone.

    • Map to pairs and reduce by key:

      rddAnomalies
        .map(po => (key(po), po))
        .reduceByKey((x, y) => 
          if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y
        )