Search code examples
scalaapache-sparkdatasetrdd

Merging multiple RDDs


I'm working on an Apache-Spark project. I have a dataset of Amazon product reviews. Each element has fields like userId, productId, score, helpfulness and so on - not really relevant for my problem I believe.

First I had to create an RDD containing tuples of relative to a specific productId; in particular the final helpfulness is not just the one the user got on that review, but an average with other users as well.

I then want to calculate the average final helpfulness of all product for each user. The function that calculates the result relative to a single product is pageRankOneMovie. I though the solution was to use a flatMap over the collection of productId, like this

 val userHelpfulnessRankings = moviesProductId.flatMap(pageRankOneMovie(movies, _).collect.toList)

However I run into error SPARK-5063 because by calling pageRankOneMovie in a flatMap I'm nesting transformations.

I've studied a bit about broadcast variables and accumulators and I think I could build something that works; however I would like to know if there is a specific solution to my problem, because it looks really simple to me: I need to programmatically create a sequence of RDDs and then merge them together.

For reference, this is the program I'm trying to run (compiles fine, gets 5063 runtime error):

object PageRank {

def pageRankOneMovie(movies : RDD[Movie], productId : String) : RDD[(String, Double)] = {
    val helpfulness = userHelpfulness(movies)
                .filter { case (_,value) => !value.isEmpty }
                .mapValues { _.get}

    val average = helpfulnessByScore(movies, productId)

    val reviews = movies.filter(_.productId == productId).map( mov => (mov.userId, mov.score))
    val reviewHelpfulness = reviews.join(helpfulness).map { case (id, (score, help)) => (score, (id, help)) }

    reviewHelpfulness.join(average).map {
        case (score, ((id, help), averageHelpfulness)) =>
            (id, if (help < averageHelpfulness) (help+averageHelpfulness)/2 else help)
    }
}

def compute(movies: RDD[Movie], context: SparkContext) : RDD[(String, Double)] = {
    val moviesProductId = movies.map(_.productId).distinct

    val userHelpfulnessRankings = moviesProductId.flatMap(pageRankOneMovie(movies, _).collect.toList)

    val average = userHelpfulnessRankings
                                .aggregateByKey((0.0,0)) ((acc, value) => (acc._1+value, acc._2+1),
                                                            (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

        average.map { case (userId, acc) => (userId, acc._1/acc._2) }
    }
}

The dataset I'm using is from https://snap.stanford.edu/data/web-Movies.html


Solution

  • Ok, it seems there is no generic solution to this problem. Apparently there are only two ways to fix the situation:

    1. Either collect the single results in a for loop and then keep working from there, or
    2. compute all the results together, in a single transformation sequence.

    Since the first solution requires to collect a potentially big amount of data from the workers to the driver I've opted for the second idea.

    Basically instead of isolating a single productId from the start I use (score, productId) tuples as keys, keeping track of multiple movies as I go. The final function is the following.

     def pageRankAllMovies(movies : RDD[Movie]) = {
        // Helpfulness media degli utenti
        // (userId, helpfulness (tra 0 e 1))
        val helpfulness = userHelpfulness(movies)
                    .filter { case (_,value) => !value.isEmpty }
                    .mapValues { _.get}
    
        // Helpfulness media delle review per film in base allo score assegnato
        // ((score, productId), helpfulness) per un singolo productId
        val average = helpfulnessByScore(movies)
    
        val reviews = movies.map( mov => (mov.userId, (mov.score, mov.productId)))
        val reviewHelpfulness = reviews.join(helpfulness).map { case (id, (score, help)) => (score, (id, help)) }
    
        // Per ogni "gruppo" di review di uno stesso film che assegnano lo stesso score tiro su
        // la helpfulness degli utenti in base alla media del film
        val globalUserHelpfulness = reviewHelpfulness.join(average).map {
            case (score, ((id, help), averageHelpfulness)) =>
                (id, if (help < averageHelpfulness) (help+averageHelpfulness)/2 else help)
        }
    
        // Se consideriamo piu' di un film alla fine ci sono piu' valori di helpfulness
        // per ogni utente. Si fa la media
        globalUserHelpfulness.aggregateByKey((0.0,0)) ((acc, value) => (acc._1+value, acc._2+1), (acc1,acc2) => (acc1._1 + acc2._1, acc1._2+ acc2._2))
            .map { case (userId, help) => (userId, help._1/help._2) }
    }
    

    tl;dr: either collect all your result in a loop or manage all the computation in one sequence of transformations.