Search code examples
apache-sparkapache-spark-sqlrddapache-spark-mllibapache-spark-ml

Formatting data for Spark ML


I'm new to spark and Spark ML. I'm generated some data with the function KMeansDataGenerator.generateKMeansRDD but I fail when formatting those so that it can then be used by an ML algorithm (here it's K-Means).

The error is

Exception in thread "main" java.lang.IllegalArgumentException: Data type ArrayType(DoubleType,false) is not supported.

It happens when using VectorAssembler.

val generatedData = KMeansDataGenerator.generateKMeansRDD(sc, numPoints = 1000, k = 5, d = 3,
        r =  5, numPartitions = 1)

val df = generatedData.toDF()

import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler()
  .setInputCols(Array("value"))
  .setOutputCol("features")
val df_final = assembler.transform(df).select("features")
df_final.show()

val nbClusters = 5
val nbIterations = 200
val kmeans = new KMeans().setK(nbClusters).setSeed(1L).setMaxIter(nbIterations)
val model = kmeans.fit(df)

Solution

  • VectorAssembler accepts only three types of columns:

    • DoubleType - double scalar, optionally with column metadata.
    • NumericType - arbitrary numeric.
    • VectorUDT - vector column.

    You are trying to pass ArrayType(DoubleType) which is not supported. You should convert your data to supported type (o.a.s.ml.linalg.DenseVector / VectorUDT seems like a reasonable choice). For example:

    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.sql.functions.{col, udf}
    
    // Spark 2.0. For 1.x use mllib
    // https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
    val seqAsVector = udf((xs: Seq[Double]) => Vectors.dense(xs.toArray))
    
    val df_final = df.withColumn("features", seqAsVector(col("value")))