I have implemented k-means algorithm in scala as follows.
def clustering(clustnum:Int,iternum:Int,parsedData: RDD[org.apache.spark.mllib.linalg.Vector]): Unit= {
val clusters = KMeans.train(parsedData, clustnum, iternum)
println("The Cluster centers of each column for "+clustnum+" clusters and "+iternum+" iterations are:- ")
clusters.clusterCenters.foreach(println)
val predictions= clusters.predict(parsedData)
predictions.collect()
}
I know how I can print cluster centers of each cluster but is there a function in scala which prints which rows have been added to which cluster?
The data I am working with contains rows of float values with each row having an ID. It has around 34 columns and around 200 rows. I am working on spark in scala.
I need to be able to see the result. As in Id_1 is in cluster 1 or so and so.
Edit : I was able to do this much
println(clustnum+" clusters and "+iternum+" iterations ")
val vectorsAndClusterIdx = parsedData.map{ point =>
val prediction = clusters.predict(point)
(point.toString, prediction)
}
vectorsAndClusterIdx.collect().foreach(println)
It prints the cluster ID and the row that is added to the cluster
The row is shown as a string and the cluster ID is the printed after
([1.0,1998.0,1.0,1.0,1.0,1.0,14305.0,39567.0,1998.0,23.499,25.7,27.961,29.04,28.061,26.171,24.44,24.619,24.529,24.497,23.838,22.322,1998.0,0.0,0.007,0.007,96.042,118.634,61.738,216.787,262.074,148.697,216.564,49.515,28.098],4)
([2.0,1998.0,1.0,1.0,2.0,1.0,185.0,2514.0,1998.0,23.499,25.7,27.961,29.04,28.061,26.171,24.44,24.619,24.529,24.497,23.838,22.322,1998.0,0.0,0.007,0.007,96.042,118.634,61.738,216.787,262.074,148.697,216.564,49.515,28.098],0)
([3.0,1998.0,1.0,1.0,2.0,2.0,27.0,272.0,1998.0,23.499,25.7,27.961,29.04,28.061,26.171,24.44,24.619,24.529,24.497,23.838,22.322,1998.0,0.0,0.007,0.007,96.042,118.634,61.738,216.787,262.074,148.697,216.564,49.515,28.098],0)
But is there some way to just print the row ID and cluster ID only?
Would using dataframes help me here?
You can use the predict()
function of KMeansModel
.
Have a look at the documentation: http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel
In your code:
KMeans.train(parsedData, clustnum, iternum)
returns a KMeansModel
object.
So, you can do this:
val predictions = clusters.predict(parsedData)
and get a MapPartitionsRDD
as result.
predictions.collect()
gives you an Array
with the cluster index assignments.