I have the following Python test code (the arguments to ALS.train
are defined elsewhere):
r1 = (2, 1)
r2 = (3, 1)
test = sc.parallelize([r1, r2])
model = ALS.train(ratings, rank, numIter, lmbda)
predictions = model.predictAll(test)
print test.take(1)
print predictions.count()
print predictions
Which works, because it has a count of 1 against the predictions variable and outputs:
[(2, 1)]
ParallelCollectionRDD[2691] at parallelize at PythonRDD.scala:423
However, when I try and use an RDD
I created myself using the following code, it doesn't appear to work anymore:
model = ALS.train(ratings, rank, numIter, lmbda)
validation_data = validation.map(lambda xs: tuple(int(x) for x in xs))
predictions = model.predictAll(validation_data)
print validation_data.take(1)
print predictions.count()
print validation_data
Which outputs:
[(61, 3864)]
PythonRDD[4018] at RDD at PythonRDD.scala:43
As you can see, predictAll
comes back empty when passed the mapped RDD
. The values going in are both of the same format. The only noticeable difference that I can see is that the first example uses parallelize and produces a ParallelCollectionRDD
whereas the second example just uses a map which produces a PythonRDD
. Does predictAll
only work if passed a certain type of RDD
? If so, is it possible to convert between RDD
types? I'm not sure how to get this working.
There are two basic conditions under which MatrixFactorizationMode.predictAll
may return a RDD with lower number of items than the input:
You can easily reproduce this behavior and check that it is is not dependent on the way how RDD has been created. First lets use example data to build a model:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
def parse(s):
x, y, z = s.split(",")
return Rating(int(x), int(y), float(z))
ratings = (sc.textFile("data/mllib/als/test.data")
.union(sc.parallelize([Rating(1, 5, 4.0)])))
model = ALS.train(ratings, 10, 10)
Next lets see which products and users are present in the training data:
set(ratings.map(lambda r: r.product).collect())
## {1, 2, 3, 4, 5}
set(ratings.map(lambda r: r.user).collect())
## {1, 2, 3, 4}
Now lets create test data and check predictions:
valid_test = sc.parallelize([(2, 5), (1, 4), (3, 5)])
## ParallelCollectionRDD[434] at parallelize at PythonRDD.scala:423
## 3
So far so good. Next lets map it using the same logic as in your code:
valid_test_ = valid_test.map(lambda xs: tuple(int(x) for x in xs))
## PythonRDD[497] at RDD at PythonRDD.scala:43
## 3
Still fine. Next lets create invalid data and repeat experiment:
invalid_test = sc.parallelize([
(2, 6), # No product in the training data
(6, 1) # No user in the training data
## ParallelCollectionRDD[500] at parallelize at PythonRDD.scala:423
## 0
invalid_test_ = invalid_test.map(lambda xs: tuple(int(x) for x in xs))
## 0
As expected there are no predictions for invalid input.
Finally you can confirm this is really the case by using ML model which is completely independent in training / prediction from Python code:
from pyspark.ml.recommendation import ALS as MLALS
model_ml = MLALS(rank=10, maxIter=10).fit(
ratings.toDF(["user", "item", "rating"])
model_ml.transform((valid_test + invalid_test).toDF(["user", "item"])).show()
## +----+----+----------+
## |user|item|prediction|
## +----+----+----------+
## | 6| 1| NaN|
## | 1| 4| 1.0184212|
## | 2| 5| 4.0041084|
## | 3| 5|0.40498763|
## | 2| 6| NaN|
## +----+----+----------+
As you can see no corresponding user / item in the training data means no prediction.