Search code examples
scalaapache-sparkcluster-analysisibm-cloudk-means

How do I view the datapoints that are added to a cluster after applying K-Means algorithm?


I have implemented k-means algorithm in scala as follows.

def clustering(clustnum:Int,iternum:Int,parsedData: RDD[org.apache.spark.mllib.linalg.Vector]): Unit= {
val clusters = KMeans.train(parsedData, clustnum, iternum)

println("The Cluster centers of each column for "+clustnum+" clusters and "+iternum+" iterations are:- ")


clusters.clusterCenters.foreach(println) 

val predictions= clusters.predict(parsedData)

 predictions.collect()

}

I know how I can print cluster centers of each cluster but is there a function in scala which prints which rows have been added to which cluster?

The data I am working with contains rows of float values with each row having an ID. It has around 34 columns and around 200 rows. I am working on spark in scala.

I need to be able to see the result. As in Id_1 is in cluster 1 or so and so.

Edit : I was able to do this much

println(clustnum+" clusters and "+iternum+" iterations ")

val vectorsAndClusterIdx = parsedData.map{ point => 
val prediction = clusters.predict(point) 
(point.toString, prediction) 
} 

vectorsAndClusterIdx.collect().foreach(println)

It prints the cluster ID and the row that is added to the cluster

The row is shown as a string and the cluster ID is the printed after

([1.0,1998.0,1.0,1.0,1.0,1.0,14305.0,39567.0,1998.0,23.499,25.7,27.961,29.04,28.061,26.171,24.44,24.619,24.529,24.497,23.838,22.322,1998.0,0.0,0.007,0.007,96.042,118.634,61.738,216.787,262.074,148.697,216.564,49.515,28.098],4)
([2.0,1998.0,1.0,1.0,2.0,1.0,185.0,2514.0,1998.0,23.499,25.7,27.961,29.04,28.061,26.171,24.44,24.619,24.529,24.497,23.838,22.322,1998.0,0.0,0.007,0.007,96.042,118.634,61.738,216.787,262.074,148.697,216.564,49.515,28.098],0)
([3.0,1998.0,1.0,1.0,2.0,2.0,27.0,272.0,1998.0,23.499,25.7,27.961,29.04,28.061,26.171,24.44,24.619,24.529,24.497,23.838,22.322,1998.0,0.0,0.007,0.007,96.042,118.634,61.738,216.787,262.074,148.697,216.564,49.515,28.098],0)

But is there some way to just print the row ID and cluster ID only?

Would using dataframes help me here?


Solution

  • You can use the predict() function of KMeansModel.

    Have a look at the documentation: http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel

    In your code:

    KMeans.train(parsedData, clustnum, iternum) 
    

    returns a KMeansModel object.

    So, you can do this:

    val predictions = clusters.predict(parsedData)
    

    and get a MapPartitionsRDD as result.

    predictions.collect()
    

    gives you an Array with the cluster index assignments.