Search code examples
apache-sparkmahoutmahout-recommendercollaborative-filtering

Mahout spark-itemsimilarity saveAsTextFile final stage is very slow


I'm using Mahout 0.11.0 on Spark 1.5.1 in YARN client mode on an HDP 2.2 cluster from the cli. My input is about 325Mb, partitioned into 1000 part files. Here's the exact command I invoke:

$MAHOUT_HOME/bin/mahout spark-itemsimilarity --input unit-similarity-dump/bpc1 --output mahout-cooccurrence-output4 --maxPrefs 200 --maxSimilaritiesPerItem 100 --master yarn-client --sparkExecutorMem 10g -D:spark.yarn.executor.memoryOverhead=1024 -D:spark.executor.cores=5 -D:spark.executor.instances=50 -D:spark.yarn.am.memory=4g -D:spark.yarn.am.memoryOverhead=512 -D:spark.yarn.am.cores=2 -D:spark.driver.memory=20g -D:spark.yarn.driver.memoryOverhead=2048 -D:spark.driver.cores=4 -D:spark.driver.maxResultSize=10g -D:spark.yarn.queue=product -D:hdp.version=2.2.6.0-2800

The applications hums along nicely until the final stage, in which saveAsTextFile gets called. At this point, the tasks grind to a halt, each taking somewhere between 45 mins to an hour to succeed. Upon closer inspection, it seems that each task is reading all 1000 partitions of the MapPartitionsRDD, which I think, intuitively, must be the source of the performance issue. These partitions are spread somewhat evenly across all of the executors, so I would think each task would need to request all the partitions from the n-1 executors that aren't its direct parent.

What's the best way to optimize this application? Less partitions, so there's less remote data to request? Less executors, so a higher percentage of data is localized for every task? Try to force a higher replication factor for the RDD? Right now it seems to be defaulting to Storage Level: Memory Deserialized 1x Replicated, 100% cached.

Here is a screenshot of the stage details for clarity: saveAsTextFile stage

Thanks in advance for any insights.

Update:

I tried using just 1 executor with multiple cores (i.e. tasks), and although all of the RDD partitions are present on a single, local executor, the performance is still very slow. I think the only culprit left is the shuffle caused by reduceByKey in the final saveAsTextFile DAG.

Second Update:

I also tried using just 1 input partition, whereas I'd previously been using 100 or even 1000. The results were quite similar, and are summarized here. For clarity, in this run I used a single 20g executor with 5 cores. This approach did, however, result in about an order of magnitude less aggregate resource allocation (as measured in MB-seconds and vcore-seconds). This is probably due to the over-allocation of executors and threads in previous runs, and implies that the bottleneck may not be computation-bound.


Solution

  • Not sure I follow all the description above. There is a BiMap Bi-directional dictionary that converts column and row ids from the ordinal Mahout id to the string external id. These data structures are in-memory and amount to 2 hashmaps per type of id (row/column). The reduceByKey works on the Mahout ids so the translation happens only during input and output. These data structures are read into the driver then broadcast to each node, only one copy is made per node where the BiMap (actually BiDictionary) is shared by executors.

    The partitioning is set to "auto" by default. Which in Mahout 11 should be a value optimized for the cooccurrence calc, and is why things "hum along".

    The final step after the reduceByKey takes each value in the remaining matrix (row key, vector), converts each id for key and vector element back into strings and writes out the text to the files in parallel.

    Frankly I have found the read and write of text files to be hugely dependent on hand tweaking. My main experience was in reading in parallel where Spark reads in all file stats before partitioning--all. This is incredibly slow vs taking 1000 files and concatenating them unto one before reading (try it yourself, they may have fixed this).

    It sounds to me like you need a better saveAsTextFile. Hand tweaking of the saveAsTextFile might be better done with your own distributed operation, a foreach works after some repartitioning of the RDD based on your own parameters. See the docs here: http://spark.apache.org/docs/latest/programming-guide.html#printing-elements-of-an-rdd

    If you want to experiment, subclass TextDelimitedIndexedDatasetReaderWriter to provide your own writer Trait. Mahout has a mapBlock operation that can be used too. It passes a block of rows to each mapBlock, which you can write using the BiMap to convert ids.

    Would love to hear any results on the Mahout User list.