Search code examples
pysparkcluster-analysisapache-spark-mllib

How label properly original observations with predicted clusters using kmeans in Pyspark?


I'd like to understand how the k-means method works in PySpark. For this, I've done this small example:

In [120]: entry = [ [1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]]

In [121]: rdd_entry = sc.parallelize(entry)

In [122]: clusters = KMeans.train(rdd_entry, k=5, maxIterations=10, initializationMode="random")

In [123]:  rdd_labels = clusters.predict(rdd_entry)

In [125]: rdd_labels.collect()
Out[125]: [3, 1, 0, 0, 2, 2, 2, 3, 2]

In [126]: entry
Out[126]:
[[1, 1, 1],
 [2, 2, 2],
 [3, 3, 3],
 [4, 4, 4],
 [5, 5, 5],
 [5, 5, 5],
 [5, 5, 5],
 [1, 1, 1],
 [5, 5, 5]]

At first glance it seems that rdd_labels returns the cluster to which each observation belongs, respecting the order of the original rdd. Although in this example it is evident, how can I be sure in a case where I will work with 8 million observations?

Also, I'd like to know how to join rdd_entry and rdd_labels, respecting that order, so that each observation of rdd_entry is correctly labeled with its cluster. I tried to do a .join(), but it jumps error

In [127]: rdd_total = rdd_entry.join(rdd_labels)

In [128]: rdd_total.collect()

TypeError: 'int' object has no attribute '__getitem__'

Solution

  • Hope it helps! (this solution is based on pyspark.ml)

    from pyspark.ml.clustering import KMeans
    from pyspark.ml.feature import VectorAssembler
    
    #sample data
    df = sc.parallelize([[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]]).\
        toDF(('col1','col2','col3'))
    
    vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features")
    vector_df = vecAssembler.transform(df)
    
    #kmeans clustering
    kmeans=KMeans(k=3, seed=1)
    model=kmeans.fit(vector_df)
    predictions=model.transform(vector_df)
    predictions.show()
    

    Output is:

    +----+----+----+-------------+----------+
    |col1|col2|col3|     features|prediction|
    +----+----+----+-------------+----------+
    |   1|   1|   1|[1.0,1.0,1.0]|         0|
    |   2|   2|   2|[2.0,2.0,2.0]|         0|
    |   3|   3|   3|[3.0,3.0,3.0]|         2|
    |   4|   4|   4|[4.0,4.0,4.0]|         1|
    |   5|   5|   5|[5.0,5.0,5.0]|         1|
    |   5|   5|   5|[5.0,5.0,5.0]|         1|
    |   5|   5|   5|[5.0,5.0,5.0]|         1|
    |   1|   1|   1|[1.0,1.0,1.0]|         0|
    |   5|   5|   5|[5.0,5.0,5.0]|         1|
    +----+----+----+-------------+----------+
    

    Although pyspark.ml has better approach I thought of writing code to achieve the same result using pyspark.mllib (trigger was the comment from @Muhammad). So here goes the solution based on pyspark.mllib...

    from pyspark.mllib.clustering import KMeans
    from pyspark.sql.functions import monotonically_increasing_id, row_number
    from pyspark.sql.window import Window
    from pyspark.sql.types import IntegerType
    
    #sample data
    rdd = sc.parallelize([[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]])
    
    #K-Means example
    model = KMeans.train(rdd, k=3, seed=1)
    labels = model.predict(rdd)
    
    #add cluster label to the original data
    df1 = rdd.toDF(('col1','col2','col3')) \
             .withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
    df2 = spark.createDataFrame(labels, IntegerType()).toDF(('label')) \
               .withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
    df = df1.join(df2, on=["row_index"]).drop("row_index")
    df.show()