I am implementing k-means and I want to create the new centroids. But the mapping leaves one element out! However, when K
is of a smaller value, like 15, it will work fine.
Based on that code I have:
val K = 25 // number of clusters
val data = sc.textFile("dense.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val count = data.count()
println("Number of records " + count)
var centroids = data.takeSample(false, K, 42).map(x => x._2)
do {
var closest = data.map(p => (closestPoint(p._2, centroids), p._2))
var pointsGroup = closest.groupByKey()
println(pointsGroup)
pointsGroup.foreach { println }
var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
//var newCentroids = pointsGroup.mapValues(ps => average(ps)).collectAsMap() this will produce an error
println(centroids.size)
println(newCentroids.size)
for (i <- 0 until K) {
tempDist += centroids(i).squaredDist(newCentroids(i))
}
..
and in the for loop, I will get the error that it won't find the element (which is not always the same and it depends on K
:
java.util.NoSuchElementException: key not found: 2
Output before the error comes up:
Number of records 27776
ShuffledRDD[5] at groupByKey at kmeans.scala:72
25
24 <- IT SHOULD BE 25
What is the problem?
>>> println(newCentroids)
Map(23 -> (-0.0050852959701492536, 0.005512245104477607, -0.004460964477611937), 17 -> (-0.005459583045685268, 0.0029015278781725795, -8.451635532994901E-4), 8 -> (-4.691649213483123E-4, 0.0025375451685393366, 0.0063490755505617585), 11 -> (0.30361112034069937, -0.0017342255382385204, -0.005751167731061906), 20 -> (-5.839587918939964E-4, -0.0038189763756820145, -0.007067070459859708), 5 -> (-0.3787612396704685, -0.005814121628643806, -0.0014961713117870657), 14 -> (0.0024755681263616547, 0.0015191503267973836, 0.003411769193899781), 13 -> (-0.002657690932944597, 0.0077671050923225635, -0.0034652379980563263), 4 -> (-0.006963114731610361, 1.1751361829025871E-4, -0.7481135105367823), 22 -> (0.015318187079953534, -1.2929035958285013, -0.0044176372190034684), 7 -> (-0.002321059060773483, -0.006316359116022083, 0.006164669723756913), 16 -> (0.005341800955165691, -0.0017540737037037035, 0.004066574093567247), 1 -> (0.0024547379611650484, 0.0056298656504855955, 0.002504618082524296), 10 -> (3.421068671121009E-4, 0.0045169004751299275, 5.696239049740164E-4), 19 -> (-0.005453716071428539, -0.001450277556818192, 0.003860007248376626), 9 -> (-0.0032921685273631807, 1.8477108457711313E-4, -0.003070412228855717), 18 -> (-0.0026803160958904053, 0.00913904078767124, -0.0023528013698630146), 3 -> (0.005750011594202901, -0.003607098309178754, -0.003615918896940412), 21 -> (0.0024925166025641056, -0.0037607353461538507, -2.1588444871794858E-4), 12 -> (-7.920202960526356E-4, 0.5390774232894769, -4.928884539473694E-4), 15 -> (-0.0018608492323232324, -0.006973787272727284, -0.0027266663434343404), 24 -> (6.151173211963486E-4, 7.081812613784045E-4, 5.612962808842611E-4), 6 -> (0.005323933953732931, 0.0024014750473186123, -2.969338590956889E-4), 0 -> (-0.0015991676750160377, -0.003001317289659613, 0.5384176139563245))
Question with relevant error: spark scala throws java.util.NoSuchElementException: key not found: 0 exception
EDIT:
After the observation of zero323 that two centroids were the same, I changed the code so that all the centroids are unique. However, the behaviour remains the same. For that reason, I suspect that closestPoint()
may return the same index for two centroids. Here is the function:
def closestPoint(p: Vector, centers: Array[Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
for (i <- 0 until centers.length) {
val tempDist = p.squaredDist(centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
}
}
return bestIndex
}
How to get away with this? I am running the code like I describe in Spark cluster.
It can happen in the "E-step" (the assignment of points to cluster-indices is analogous to the E-step of the EM algorithm) that one of your indices will not be assigned any points. If this happens then you need to have a way of associating that index with some point, otherwise you're going to wind up with fewer clusters after the "M-step" (the assignment of centroids to the indices is analogous to the M-step of the EM algorithm.) Something like this should probably work:
val newCentroids = {
val temp = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
val nMissing = K - temp.size
val sample = data.takeSample(false, nMissing, seed)
var c = -1
(for (i <- 0 until K) yield {
val point = temp.getOrElse(i, {c += 1; sample(c) })
(i, point)
}).toMap
}
Just substitute that code for the line you are currently using to compute newCentroids
.
There are other ways of dealing with this issue and the approach above is probably not the best (is it a good idea to be calling takeSample
multiple times, once for each iteration of the the k-means algorithm? what if data
contains a lot of repeated values?, etc.), but it is a simple starting point.
By the way, you might want to think about how you can replace the groupByKey
with a reduceByKey
.
Note: For the curious, here's a reference describing the similarities between the EM-algorithm and the k-means algorithm: http://papers.nips.cc/paper/989-convergence-properties-of-the-k-means-algorithms.pdf.