I've been working with dataset Movielens (20 million records) and have been using collaborative filtering in Spark MLlib.
My environment is Ubuntu 14.4 on VirtualBox. I have one master node and 2 slave nodes. I used the released Apache Hadoop, Apache Spark, Scala, sbt. The code is written in Scala.
How to distribute the code and the dataset onto worker nodes?
import java.lang.Math._
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object trainModel extends App {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("trainModel")
val sc = new SparkContext(conf)
val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
val sqlContext = new SQLContext(sc)
val df = sqlContext
.read
.option("header", "true")
.format("csv")
.load("file:///usr/local/spark/dataset/rating.csv")
val ratings = rawData.map(line => line.split(",").take(3) match {
case Array(userId, movieId, rating) =>
Rating(userId.toInt, movieId.toInt, rating.toFloat)
})
println(s"Number of Ratings in Movie file ${ratings.count()} \n")
val ratingsRDD = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
//split data into test&train
val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), seed = 12345)
val trainingRatingsRDD = splits(0).cache()
val testRatingsRDD = splits(1).cache()
val numTraining = trainingRatingsRDD.count()
val numTest = testRatingsRDD.count()
println(s"Training: $numTraining, test: $numTest.")
val rank = 10
val lambdas = 0.01
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations)
//Evaluate the model on training data
val userProducts = ratings.map { case Rating(userId, movieId, rating) =>
(userId, movieId)
}
val predictions = model.predict(userProducts).map { case
Rating(userId, movieId, rating) =>
((userId, movieId), rating)
}
val ratesAndPreds = ratings.map { case Rating(userId, movieId, rating) =>
((userId, movieId),
rating)
}.join(predictions)
val meanSquaredError = ratesAndPreds.map { case ((userId, movieId),
(r1, r2)) =>
val err = r1 - r2
err * err
}.mean
println("Mean Squared Error= " + meanSquaredError)
sqrt(meanSquaredError)
val rmse = math.sqrt(meanSquaredError)
println(s" RMSE = $rmse.")
}
How to distribute code
That happens when you spark-submit
a Spark application. Distribution can be per CPU core/thread or executors. You don't have to code it. It is why people use Spark as it should happen (almost) automatically.
conf.setMaster("local[*]")
That says that you use a single executor with as many threads as you've got CPU cores. That's a local distribution.
You'd be better off removing the line from the code and use spark-submit --master
instead. Read the official documentation, esp. Submitting Applications.
...and dataset into worker nodes? val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
The line says how the Movielens dataset (rating.csv
) is distributed. It has nothing to do with Spark as Spark uses whatever distribution on a file system is used.
In other words, on Hadoop HDFS with 256MB block size (split), a file twice the size of the block size is available in 2 parts. That's HDFS to make the file distributed and fault-tolerant.
When Spark reads the 2-split file, the distributed computation (described using RDD) will use 2 partitions and so 2 tasks.
HDFS is a file system / storage so pick any location and hdfs -put
the dataset. Think of HDFS as any file system you have a remote access to. Use the location as the input parameter of sc.textFile
and you're done.