Search code examples
scalaapache-sparkapache-spark-sqlapache-spark-mllib

How to distribute code and dataset onto worker nodes?


I've been working with dataset Movielens (20 million records) and have been using collaborative filtering in Spark MLlib.

steps of algorithm

My environment is Ubuntu 14.4 on VirtualBox. I have one master node and 2 slave nodes. I used the released Apache Hadoop, Apache Spark, Scala, sbt. The code is written in Scala.

How to distribute the code and the dataset onto worker nodes?

import java.lang.Math._

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object trainModel extends App {

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("trainModel")
  val sc = new SparkContext(conf)

  val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")

  val sqlContext = new SQLContext(sc)
  val df = sqlContext
    .read
    .option("header", "true")
    .format("csv")
    .load("file:///usr/local/spark/dataset/rating.csv")

  val ratings = rawData.map(line => line.split(",").take(3) match {
    case Array(userId, movieId, rating) => 
      Rating(userId.toInt, movieId.toInt, rating.toFloat)
  })
  println(s"Number of Ratings in Movie file ${ratings.count()} \n")

  val ratingsRDD = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
  //split data into test&train
  val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), seed = 12345)
  val trainingRatingsRDD = splits(0).cache()
  val testRatingsRDD = splits(1).cache()
  val numTraining = trainingRatingsRDD.count()
  val numTest = testRatingsRDD.count()
  println(s"Training: $numTraining, test: $numTest.")

  val rank = 10
  val lambdas = 0.01
  val numIterations = 10
  val model = ALS.train(ratings, rank, numIterations)
  //Evaluate the model on training data
  val userProducts = ratings.map { case Rating(userId, movieId, rating) =>
    (userId, movieId)
  }
  val predictions = model.predict(userProducts).map { case
    Rating(userId, movieId, rating) =>
    ((userId, movieId), rating)
  }
  val ratesAndPreds = ratings.map { case Rating(userId, movieId, rating) =>
    ((userId, movieId),
      rating)
  }.join(predictions)
  val meanSquaredError = ratesAndPreds.map { case ((userId, movieId),
  (r1, r2)) =>
    val err = r1 - r2
    err * err
  }.mean
  println("Mean Squared Error= " + meanSquaredError)
  sqrt(meanSquaredError)
  val rmse = math.sqrt(meanSquaredError)
  println(s" RMSE = $rmse.")
}

Solution

  • How to distribute code

    That happens when you spark-submit a Spark application. Distribution can be per CPU core/thread or executors. You don't have to code it. It is why people use Spark as it should happen (almost) automatically.

    conf.setMaster("local[*]")

    That says that you use a single executor with as many threads as you've got CPU cores. That's a local distribution.

    You'd be better off removing the line from the code and use spark-submit --master instead. Read the official documentation, esp. Submitting Applications.

    ...and dataset into worker nodes? val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")

    The line says how the Movielens dataset (rating.csv) is distributed. It has nothing to do with Spark as Spark uses whatever distribution on a file system is used.

    In other words, on Hadoop HDFS with 256MB block size (split), a file twice the size of the block size is available in 2 parts. That's HDFS to make the file distributed and fault-tolerant.

    When Spark reads the 2-split file, the distributed computation (described using RDD) will use 2 partitions and so 2 tasks.

    HDFS is a file system / storage so pick any location and hdfs -put the dataset. Think of HDFS as any file system you have a remote access to. Use the location as the input parameter of sc.textFile and you're done.