I am implementing k-means and I want to create the new centroids. But the mapping leaves one element out! However, when K
is of a smaller value, like 15, it will work fine.
Based on that code I have:
val K = 25 // number of clusters
val data = sc.textFile("dense.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val count = data.count()
println("Number of records " + count)
var centroids = data.takeSample(false, K, 42).map(x => x._2)
do {
var closest = data.map(p => (closestPoint(p._2, centroids), p._2))
var pointsGroup = closest.groupByKey()
pointsGroup.foreach { println }
var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
//var newCentroids = pointsGroup.mapValues(ps => average(ps)).collectAsMap() this will produce an error
for (i <- 0 until K) {
tempDist += centroids(i).squaredDist(newCentroids(i))
and in the for loop, I will get the error that it won't find the element (which is not always the same and it depends on K
java.util.NoSuchElementException: key not found: 2
Output before the error comes up:
Number of records 27776
ShuffledRDD[5] at groupByKey at kmeans.scala:72
24 <- IT SHOULD BE 25
What is the problem?
>>> println(newCentroids)
Map(23 -> (-0.0050852959701492536, 0.005512245104477607, -0.004460964477611937), 17 -> (-0.005459583045685268, 0.0029015278781725795, -8.451635532994901E-4), 8 -> (-4.691649213483123E-4, 0.0025375451685393366, 0.0063490755505617585), 11 -> (0.30361112034069937, -0.0017342255382385204, -0.005751167731061906), 20 -> (-5.839587918939964E-4, -0.0038189763756820145, -0.007067070459859708), 5 -> (-0.3787612396704685, -0.005814121628643806, -0.0014961713117870657), 14 -> (0.0024755681263616547, 0.0015191503267973836, 0.003411769193899781), 13 -> (-0.002657690932944597, 0.0077671050923225635, -0.0034652379980563263), 4 -> (-0.006963114731610361, 1.1751361829025871E-4, -0.7481135105367823), 22 -> (0.015318187079953534, -1.2929035958285013, -0.0044176372190034684), 7 -> (-0.002321059060773483, -0.006316359116022083, 0.006164669723756913), 16 -> (0.005341800955165691, -0.0017540737037037035, 0.004066574093567247), 1 -> (0.0024547379611650484, 0.0056298656504855955, 0.002504618082524296), 10 -> (3.421068671121009E-4, 0.0045169004751299275, 5.696239049740164E-4), 19 -> (-0.005453716071428539, -0.001450277556818192, 0.003860007248376626), 9 -> (-0.0032921685273631807, 1.8477108457711313E-4, -0.003070412228855717), 18 -> (-0.0026803160958904053, 0.00913904078767124, -0.0023528013698630146), 3 -> (0.005750011594202901, -0.003607098309178754, -0.003615918896940412), 21 -> (0.0024925166025641056, -0.0037607353461538507, -2.1588444871794858E-4), 12 -> (-7.920202960526356E-4, 0.5390774232894769, -4.928884539473694E-4), 15 -> (-0.0018608492323232324, -0.006973787272727284, -0.0027266663434343404), 24 -> (6.151173211963486E-4, 7.081812613784045E-4, 5.612962808842611E-4), 6 -> (0.005323933953732931, 0.0024014750473186123, -2.969338590956889E-4), 0 -> (-0.0015991676750160377, -0.003001317289659613, 0.5384176139563245))
Question with relevant error: spark scala throws java.util.NoSuchElementException: key not found: 0 exception
After the observation of zero323 that two centroids were the same, I changed the code so that all the centroids are unique. However, the behaviour remains the same. For that reason, I suspect that closestPoint()
may return the same index for two centroids. Here is the function:
def closestPoint(p: Vector, centers: Array[Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
for (i <- 0 until centers.length) {
val tempDist = p.squaredDist(centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
return bestIndex
How to get away with this? I am running the code like I describe in Spark cluster.
It can happen in the "E-step" (the assignment of points to cluster-indices is analogous to the E-step of the EM algorithm) that one of your indices will not be assigned any points. If this happens then you need to have a way of associating that index with some point, otherwise you're going to wind up with fewer clusters after the "M-step" (the assignment of centroids to the indices is analogous to the M-step of the EM algorithm.) Something like this should probably work:
val newCentroids = {
val temp = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
val nMissing = K - temp.size
val sample = data.takeSample(false, nMissing, seed)
var c = -1
(for (i <- 0 until K) yield {
val point = temp.getOrElse(i, {c += 1; sample(c) })
(i, point)
Just substitute that code for the line you are currently using to compute newCentroids
There are other ways of dealing with this issue and the approach above is probably not the best (is it a good idea to be calling takeSample
multiple times, once for each iteration of the the k-means algorithm? what if data
contains a lot of repeated values?, etc.), but it is a simple starting point.
By the way, you might want to think about how you can replace the groupByKey
with a reduceByKey
Note: For the curious, here's a reference describing the similarities between the EM-algorithm and the k-means algorithm: http://papers.nips.cc/paper/989-convergence-properties-of-the-k-means-algorithms.pdf.