I'm working on an Apache-Spark project. I have a dataset of Amazon product reviews. Each element has fields like userId, productId, score, helpfulness and so on - not really relevant for my problem I believe.
First I had to create an RDD containing tuples of relative to a specific productId; in particular the final helpfulness is not just the one the user got on that review, but an average with other users as well.
I then want to calculate the average final helpfulness of all product for each user. The function that calculates the result relative to a single product is pageRankOneMovie
. I though the solution was to use a flatMap over the collection of productId, like this
val userHelpfulnessRankings = moviesProductId.flatMap(pageRankOneMovie(movies, _).collect.toList)
However I run into error SPARK-5063 because by calling pageRankOneMovie
in a flatMap I'm nesting transformations.
I've studied a bit about broadcast variables and accumulators and I think I could build something that works; however I would like to know if there is a specific solution to my problem, because it looks really simple to me: I need to programmatically create a sequence of RDDs and then merge them together.
For reference, this is the program I'm trying to run (compiles fine, gets 5063 runtime error):
object PageRank {
def pageRankOneMovie(movies : RDD[Movie], productId : String) : RDD[(String, Double)] = {
val helpfulness = userHelpfulness(movies)
.filter { case (_,value) => !value.isEmpty }
.mapValues { _.get}
val average = helpfulnessByScore(movies, productId)
val reviews = movies.filter(_.productId == productId).map( mov => (mov.userId, mov.score))
val reviewHelpfulness = reviews.join(helpfulness).map { case (id, (score, help)) => (score, (id, help)) }
reviewHelpfulness.join(average).map {
case (score, ((id, help), averageHelpfulness)) =>
(id, if (help < averageHelpfulness) (help+averageHelpfulness)/2 else help)
}
}
def compute(movies: RDD[Movie], context: SparkContext) : RDD[(String, Double)] = {
val moviesProductId = movies.map(_.productId).distinct
val userHelpfulnessRankings = moviesProductId.flatMap(pageRankOneMovie(movies, _).collect.toList)
val average = userHelpfulnessRankings
.aggregateByKey((0.0,0)) ((acc, value) => (acc._1+value, acc._2+1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
average.map { case (userId, acc) => (userId, acc._1/acc._2) }
}
}
The dataset I'm using is from https://snap.stanford.edu/data/web-Movies.html
Ok, it seems there is no generic solution to this problem. Apparently there are only two ways to fix the situation:
collect
the single results in a for loop and then keep working from there, orSince the first solution requires to collect a potentially big amount of data from the workers to the driver I've opted for the second idea.
Basically instead of isolating a single productId from the start I use (score, productId) tuples as keys, keeping track of multiple movies as I go. The final function is the following.
def pageRankAllMovies(movies : RDD[Movie]) = {
// Helpfulness media degli utenti
// (userId, helpfulness (tra 0 e 1))
val helpfulness = userHelpfulness(movies)
.filter { case (_,value) => !value.isEmpty }
.mapValues { _.get}
// Helpfulness media delle review per film in base allo score assegnato
// ((score, productId), helpfulness) per un singolo productId
val average = helpfulnessByScore(movies)
val reviews = movies.map( mov => (mov.userId, (mov.score, mov.productId)))
val reviewHelpfulness = reviews.join(helpfulness).map { case (id, (score, help)) => (score, (id, help)) }
// Per ogni "gruppo" di review di uno stesso film che assegnano lo stesso score tiro su
// la helpfulness degli utenti in base alla media del film
val globalUserHelpfulness = reviewHelpfulness.join(average).map {
case (score, ((id, help), averageHelpfulness)) =>
(id, if (help < averageHelpfulness) (help+averageHelpfulness)/2 else help)
}
// Se consideriamo piu' di un film alla fine ci sono piu' valori di helpfulness
// per ogni utente. Si fa la media
globalUserHelpfulness.aggregateByKey((0.0,0)) ((acc, value) => (acc._1+value, acc._2+1), (acc1,acc2) => (acc1._1 + acc2._1, acc1._2+ acc2._2))
.map { case (userId, help) => (userId, help._1/help._2) }
}
tl;dr: either collect
all your result in a loop or manage all the computation in one sequence of transformations.