Search code examples
pythonapache-sparkpysparkapache-spark-mllibpy4j

Pyspark Py4j IllegalArgumentException with spark.createDataFrame and pyspark.ml.clustering


Let me disclose the full background of my problem first, I'll have a simplified MWE that recreates the same issue at the bottom. Feel free to skip me rambling about my setup and go straight to the last section.

The Actors in my Original Problem:

  1. A spark dataframe data read from Amazon S3, with a column scaled_features that ultimately is the result of a VectorAssembler operation followed by a MinMaxScaler.
  2. A spark dataframe column pca_features that results from the above df column after a PCA like so:
mat = RowMatrix(data.select('scaled_features').rdd.map(list))
pc = mat.computePrincipalComponents(2)
projected = mat.multiply(pc).rows.map(lambda x: (x, )).toDF().withColumnRenamed('_1', 'pca_features')
  1. Two instances of BisectingKMeans fitting to both instances of features in the abovementioned data frames like so:
kmeans_scaled = BisectingKMeans(featuresCol='scaled_features').setK(4).setSeed(1)
model1 = kmeans_scaled.fit(data)

kmeans_pca = BisectingKMeans(featuresCol='pca_features').setK(4).setSeed(1)
model2 = kmeans_pca.fit(projected)

The Issue:

While BisectingKMeans fits to scaled_features from my first df without issues, when attempting a fit to the projected features, it errors out with the following

Py4JJavaError: An error occurred while calling o1413.fit.
: java.lang.IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types:
[struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>]
but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.

As you can see, Py4J complains that I'm passing data in a certain struct type that happens to be the first type specified in the list of allowed types.

Additional Debug Info:

My Spark is running version 2.4.0

Checking the dtypes yields: data.dtypes: [('scaled_features', 'vector')] and projected.dtypes: [('pca_features', 'vector')]. The Schema is the same for both dataframes as well, printing just one for reference:

root
 |-- scaled_features: vector (nullable = true)

Recreating the error (MWE):

It turns out that this same error can be recreated by creating a simple data frame from some Vectors (the columns in my original dfs are of VectorType as well):

from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.clustering import BisectingKMeans

test_data = spark.createDataFrame([Row(test_features=DenseVector([43.0, 0.0, 200.0, 1.0, 1.0, 1.0, 0.0, 3.0])),
    Row(test_features=DenseVector([44.0, 0.0, 250.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
    Row(test_features=DenseVector([23.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
    Row(test_features=DenseVector([25.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 2.0])),
    Row(test_features=DenseVector([19.0, 0.0, 200.0, 1.0, 0.0, 1.0, 0.0, 1.0]))])

kmeans_test = BisectingKMeans(featuresCol='test_features').setK(4).setSeed(1)
model3 = kmeans_test.fit(test_data)

The last line results in the same error I'm facing in my original setup.

Can anyone explain this error and suggest a way to rectify it?


Solution

  • After a few more days of investigation, I was pointed to the (rather embarrassing) cause of the issue:

    Pyspark has two machine learning libraries: pyspark.ml and pyspark.mllib and it turns out they don't go well together. Replacing from pyspark.mllib.linalg import DenseVector by from pyspark.ml.linalg import DenseVector resolves all the issues.