Search code examples
apache-sparkapache-zeppelinapache-spark-mllib

Awfully slow execution on a small datasets – where to start debugging?


I do some experimentation on a MacBook (i5, 2.6GHz, 8GB ram) with Zeppelin NB and Spark in standalone mode. spark.executor/driver.memory both get 2g. I have also set spark.serializer org.apache.spark.serializer.KryoSerializer in spark-defaults.conf, but that seems to be ignored by zeppelin


ALS model

I have trained a ALS model with ~400k (implicit) ratings and want to get recommendations with val allRecommendations = model.recommendProductsForUsers(1)

Sample set

Next I take a sample to play around with

val sampledRecommendations = allRecommendations.sample(false, 0.05, 1234567).cache

This contains 3600 recommendations.

Remove product recommendations that users own

Next I want to remove all ratings for products that a given user already owns, the list I hold in a RDD of the form (user_id, Set[product_ids]): RDD[(Long, scala.collection.mutable.HashSet[Int])]

val productRecommendations = (sampledRecommendations
// add user portfolio to the list, but convert the key from Long to Int first
.join(usersProductsFlat.map( up => (up._1.toInt, up._2) ))
.mapValues(
    // (user, (ratings: Array[Rating], usersOwnedProducts: HashSet[Long]))
    r => (r._1
        .filter( rating => !r._2.contains(rating.product))
        .filter( rating => rating.rating > 0.5)
        .toList
    )
  )
  // In case there is no recommendation (left), remove the entry
  .filter(rating => !rating._2.isEmpty)
).cache

Question 1 Calling this (productRecommendations.count) on the cached sample set generates a stage that includes flatMap at MatrixFactorizationModel.scala:278 with 10,000 tasks, 263.6 MB of input data and 196.0 MB shuffle write. Shouldn't the tiny and cached RDD be used instead and what is going (wr)on(g) here? The execution of the count takes almost 5 minutes!

Question 2 Calling usersProductsFlat.count which is fully cached according to the "Storage" view in the application UI takes ~60 seconds each time. It's 23Mb in size – shouldn't that be a lot faster?

Map to readable form

Next I bring this in some readable form replacing IDs with names from a broadcasted lookup Map to put into a DF/table:

val readableRatings = (productRecommendations
    .flatMapValues(x=>x)
    .map( r => (r._1, userIdToMailBC.value(r._1), r._2.product.toInt, productIdToNameBC.value(r._2.product), r._2.rating))
).cache
val readableRatingsDF = readableRatings.toDF("user","email", "product_id", "product", "rating").cache
readableRatingsDF.registerTempTable("recommendations")

Select … with patience

The insane part starts here. Doing a SELECT takes several hours (I could never wait for one to finish):

%sql
SELECT COUNT(user) AS usr_cnt, product, AVG(rating) AS avg_rating
FROM recommendations
GROUP BY product

Query takes virtually forever


I don't know where to look to find the bottlenecks here, there is obviously some huge kerfuffle going on here! Where can I start looking?


Solution

  • Your number of partitions may be too large. I think you should use about 200 when running in local mode rather than 10000. You can set the number of partitions in different ways. I suggest you edit the spark.default.parallelism flag in the Spark configuration file.