I have a performance problem with a code I'm revisioning, everytime will give an OOM
while performing a count.
I think I found the problem, basically after keyBy
tranformation, being executed aggregateByKey.
The problem lies to the fact that almost 98% of the RDD elements has the same key, so aggregationByKey, generate shuffle, putting nearly all records into the same partition, bottom line: just few executors works, and has to much memory pressure.
This is the code:
val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
.keyBy(po => po.getProcessCreator.name)
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
.map {case(name,list) =>
val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
lastOfGroupByKeys.flatMap(f => f._2)
}
.flatMap(f => f)
log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)
I would a way to perform operation in a more parallel way, allowing all executors to work nearly equally. How can I do that?
Should I have to use a custom partitioner?
If your observation is correct and
98% of the RDD elements has the same key
then change of partitioner won't help you at all. By the definition of the partitioner 98% of the data will have to be processed by a single executor.
Luckily bad code is probably the bigger problem here than the skew. Skipping over:
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
which is just a folk magic it looks like the whole pipeline can be rewritten as a simple reuceByKey
. Pseudocode:
Combine name
and local keys into a single key:
def key(po: AnomalyPO) = (
// "major" key
po.getProcessCreator.name,
// "minor" key
po.getPodId, po.getAnomalyCode,
po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID
)
Key containing name, date and additional fields should have much higher cardinality than the name alone.
Map to pairs and reduce by key:
rddAnomalies
.map(po => (key(po), po))
.reduceByKey((x, y) =>
if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y
)