Search code examples
pysparkamazon-emr

Convert KMeans "centres" output to PySpark dataframe


I'm running a K-means clustering model, and I want to analyse the cluster centroids, however the centers output is a LIST of my 20 centroids, with their coordinates (8 each) as an ARRAY. I need it as a dataframe, with clusters 1:20 as rows, and their attribute values (centroid coordinates) as columns like so:

c1 | 0.85 | 0.03 | 0.01 | 0.00 | 0.12 | 0.01 | 0.00 | 0.12 
c2 | 0.25 | 0.80 | 0.10 | 0.00 | 0.12 | 0.01 | 0.00 | 0.77
c3 | 0.05 | 0.10 | 0.00 | 0.82 | 0.00 | 0.00 | 0.22 | 0.00

The dataframe format is important because what I WANT to do is:

For each centroid Identify the 3 strongest attributes Create a "name" for each of the 20 centroids that is a concatenation of the 3 most dominant traits in that centroid

For example:

c1 | milk_eggs_cheese
c2 | meat_milk_bread
c3 | toiletries_bread_eggs

This code is running in Zeppelin, EMR version 5.19, Spark2.4. The model works great, but this is the boilerplate code from the Spark documentation (https://spark.apache.org/docs/latest/ml-clustering.html#k-means), which produces the list of arrays output that I can't really use.

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

This is an excerpt of the output I get.

Cluster Centers: 
[0.12391775 0.04282062 0.00368751 0.27282358 0.00533401 0.03389095
 0.04220946 0.03213536 0.00895981 0.00990327 0.01007891]
[0.09018751 0.01354349 0.0130329  0.00772877 0.00371508 0.02288211
 0.032301   0.37979978 0.002487   0.00617438 0.00610262]
[7.37626746e-02 2.02469798e-03 4.00944473e-04 9.62304581e-04
 5.98964859e-03 2.95190585e-03 8.48736175e-01 1.36797882e-03
 2.57451073e-04 6.13320072e-04 5.70559278e-04]

Based on How to convert a list of array to Spark dataframe I have tried this:

df = sc.parallelize(centers).toDF(['fresh_items', 'wine_liquor', 'baby', 'cigarettes', 'fresh_meat', 'fruit_vegetables', 'bakery', 'toiletries', 'pets', 'coffee', 'cheese'])
df.show()

But this throws the following error:

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

Solution

  • model.clusterCenters() gives you a list of numpy arrays and not a list of lists like in the answer you have linked. Just convert the numpy arrays to a lists before creating the dataframe:

    bla = [e.tolist() for e in centers]
    df = sc.parallelize(bla).toDF(['fresh_items', 'wine_liquor', 'baby', 'cigarettes', 'fresh_meat', 'fruit_vegetables', 'bakery', 'toiletries', 'pets', 'coffee', 'cheese'])
    #or df = spark.createDataFrame(bla, ['fresh_items', 'wine_liquor', 'baby', 'cigarettes', 'fresh_meat', 'fruit_vegetables', 'bakery', 'toiletries', 'pets', 'coffee', 'cheese']
    df.show()