When attempting to run my method:
def doGD() = {
allRatings.foreach(rating => gradientDescent(rating));
}
I get the error: org.apache.spark.SparkException: Task not serialisable
I understand that my method of Gradient Descent is not going to parallelise because each step depends upon the previous step - so working in parallel is not an option. However, from the Console if I do this:
val gd = new GradientDescent()
gd.doGD();
I get the error as mentioned.
However, if in the Console I do this:
val gd = new GradientDescent()
gd.allRatings.foreach(rating => gradientDescent(rating))
It works perfectly fine. As you may have noticed what works in the 2nd example is the same code as in the first example except instead of a method I just take the code out of the method and call it directly.
Why does the one work and the other does not? I'm bemused.
(Additional note: Class GradientDescent extends Serializable
).
The gradientDescent
method:
def gradientDescent(rating : Rating) = {
var userVector = userFactors.get(rating.user).get
var itemVector = itemFactors.get(rating.product).get
userFactors.map(x => if(x._1 == rating.user)(x._1, x._2 += 0.02 * (calculatePredictionError(rating.rating, userVector, itemVector) * itemVector)))
userVector = userFactors.get(rating.user).get // updated user vector
itemFactors.map(x => if(x._1 == rating.product)(x._1, x._2 += 0.02 * (calculatePredictionError(rating.rating, userVector, itemVector) * itemVector)))
}
I know I'm using 2 vars stored on the master - userFactors
and itemFactors
- and as the process is sequential parallelising is not possible. But this doesn't explain why calling the method from the Console does not work but re-writing the inners of the method in the Console does.
Hard to tell without full source of GradientDescent
class, but you're probably capturing an unserializable value. When running the method, it needs to serialize the full object and send it to the workers, while inlined version doesn't.