Search code examples
javascalaapache-sparkdistributed-computingrdd

Mapping of elements gone bad


I am implementing k-means and I want to create the new centroids. But the mapping leaves one element out! However, when K is of a smaller value, like 15, it will work fine.

Based on that code I have:

val K = 25 // number of clusters
val data = sc.textFile("dense.txt").map(
     t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val count = data.count()
println("Number of records " + count)

var centroids = data.takeSample(false, K, 42).map(x => x._2)
do {
  var closest = data.map(p => (closestPoint(p._2, centroids), p._2))
  var pointsGroup = closest.groupByKey()
  println(pointsGroup)
  pointsGroup.foreach { println }
  var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
  //var newCentroids = pointsGroup.mapValues(ps => average(ps)).collectAsMap() this will produce an error
  println(centroids.size)
  println(newCentroids.size)
  for (i <- 0 until K) {
    tempDist += centroids(i).squaredDist(newCentroids(i))
  }
  ..

and in the for loop, I will get the error that it won't find the element (which is not always the same and it depends on K:

java.util.NoSuchElementException: key not found: 2

Output before the error comes up:

Number of records 27776
ShuffledRDD[5] at groupByKey at kmeans.scala:72
25
24            <- IT SHOULD BE 25

What is the problem?


>>> println(newCentroids)
Map(23 -> (-0.0050852959701492536, 0.005512245104477607, -0.004460964477611937), 17 -> (-0.005459583045685268, 0.0029015278781725795, -8.451635532994901E-4), 8 -> (-4.691649213483123E-4, 0.0025375451685393366, 0.0063490755505617585), 11 -> (0.30361112034069937, -0.0017342255382385204, -0.005751167731061906), 20 -> (-5.839587918939964E-4, -0.0038189763756820145, -0.007067070459859708), 5 -> (-0.3787612396704685, -0.005814121628643806, -0.0014961713117870657), 14 -> (0.0024755681263616547, 0.0015191503267973836, 0.003411769193899781), 13 -> (-0.002657690932944597, 0.0077671050923225635, -0.0034652379980563263), 4 -> (-0.006963114731610361, 1.1751361829025871E-4, -0.7481135105367823), 22 -> (0.015318187079953534, -1.2929035958285013, -0.0044176372190034684), 7 -> (-0.002321059060773483, -0.006316359116022083, 0.006164669723756913), 16 -> (0.005341800955165691, -0.0017540737037037035, 0.004066574093567247), 1 -> (0.0024547379611650484, 0.0056298656504855955, 0.002504618082524296), 10 -> (3.421068671121009E-4, 0.0045169004751299275, 5.696239049740164E-4), 19 -> (-0.005453716071428539, -0.001450277556818192, 0.003860007248376626), 9 -> (-0.0032921685273631807, 1.8477108457711313E-4, -0.003070412228855717), 18 -> (-0.0026803160958904053, 0.00913904078767124, -0.0023528013698630146), 3 -> (0.005750011594202901, -0.003607098309178754, -0.003615918896940412), 21 -> (0.0024925166025641056, -0.0037607353461538507, -2.1588444871794858E-4), 12 -> (-7.920202960526356E-4, 0.5390774232894769, -4.928884539473694E-4), 15 -> (-0.0018608492323232324, -0.006973787272727284, -0.0027266663434343404), 24 -> (6.151173211963486E-4, 7.081812613784045E-4, 5.612962808842611E-4), 6 -> (0.005323933953732931, 0.0024014750473186123, -2.969338590956889E-4), 0 -> (-0.0015991676750160377, -0.003001317289659613, 0.5384176139563245))

Question with relevant error: spark scala throws java.util.NoSuchElementException: key not found: 0 exception


EDIT:

After the observation of zero323 that two centroids were the same, I changed the code so that all the centroids are unique. However, the behaviour remains the same. For that reason, I suspect that closestPoint() may return the same index for two centroids. Here is the function:

  def closestPoint(p: Vector, centers: Array[Vector]): Int = {
    var index = 0
    var bestIndex = 0
    var closest = Double.PositiveInfinity
    for (i <- 0 until centers.length) {
      val tempDist = p.squaredDist(centers(i))
      if (tempDist < closest) {
        closest = tempDist
        bestIndex = i
      }
    }
    return bestIndex
  }

How to get away with this? I am running the code like I describe in Spark cluster.


Solution

  • It can happen in the "E-step" (the assignment of points to cluster-indices is analogous to the E-step of the EM algorithm) that one of your indices will not be assigned any points. If this happens then you need to have a way of associating that index with some point, otherwise you're going to wind up with fewer clusters after the "M-step" (the assignment of centroids to the indices is analogous to the M-step of the EM algorithm.) Something like this should probably work:

    val newCentroids = {
      val temp = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
      val nMissing = K - temp.size 
      val sample = data.takeSample(false, nMissing, seed)
      var c = -1
      (for (i <- 0 until K) yield {
       val point = temp.getOrElse(i, {c += 1; sample(c) })
       (i, point)
      }).toMap      
    }   
    

    Just substitute that code for the line you are currently using to compute newCentroids.

    There are other ways of dealing with this issue and the approach above is probably not the best (is it a good idea to be calling takeSample multiple times, once for each iteration of the the k-means algorithm? what if data contains a lot of repeated values?, etc.), but it is a simple starting point.

    By the way, you might want to think about how you can replace the groupByKey with a reduceByKey.

    Note: For the curious, here's a reference describing the similarities between the EM-algorithm and the k-means algorithm: http://papers.nips.cc/paper/989-convergence-properties-of-the-k-means-algorithms.pdf.