Search code examples
scalaapache-sparklinear-regression

Spark IllegalArgumentException: Column features must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>


I'm trying to use the org.apache.spark.ml.regression.LinearRegression fit my data. So I've got the original RDD transformed to dataframe, and have tried to feed it to the linearRegression model.

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val parsedData = dataRDD.map{
  item =>
    val doubleArray = Array(item._1.toDouble, item._2.toDouble, item._3.toDouble)
    val features = Vectors.dense(doubleArray)
    Row(item._4.toDouble, features)
}

val schema = List(
  StructField("label", DoubleType, true),
  StructField("features", new org.apache.spark.mllib.linalg.VectorUDT, true)
)

val df = spark.createDataFrame(
  parsedData,
  StructType(schema)
)
val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

val lr_model = lr.fit(df)

And here is what the dataframe looks like:

+---------+-------------+
|    label|     features|
+---------+-------------+
|      5.0|[0.0,1.0,0.0]|
|     20.0|[0.0,1.0,0.0]|
|    689.0|[0.0,1.0,0.0]|
|    627.0|[0.0,1.0,0.0]|
|    127.0|[0.0,1.0,0.0]|
|      0.0|[0.0,1.0,0.0]|
|      0.0|[0.0,1.0,0.0]|
|      0.0|[0.0,1.0,0.0]|
|     76.0|[0.0,1.0,0.0]|
|      5.0|[0.0,1.0,0.0]|
|      0.0|[0.0,1.0,0.0]|
|      0.0|[0.0,1.0,0.0]|
|      0.0|[0.0,1.0,0.0]|
|      0.0|[0.0,1.0,0.0]|
|      0.0|[0.0,1.0,0.0]|
|      2.0|[0.0,1.0,0.0]|
|    329.0|[0.0,1.0,0.0]|
|2354115.0|[0.0,1.0,0.0]|
|      5.0|[0.0,1.0,0.0]|
|   4303.0|[0.0,1.0,0.0]|
+---------+-------------+

But it presented the error below.

java.lang.IllegalArgumentException: requirement failed: Column features must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.

The later data type doesn't seem to be different from the one required. Can anyone help?


Solution

  • You are using org.apache.spark.ml.regression.LinearRegression (sparkML) with the old version of VectorUDT (mllib which is deprecated) and they do not seem to work together.

    Replace new org.apache.spark.mllib.linalg.VectorUDT by new org.apache.spark.ml.linalg.VectorUDT and it should work.

    Note that to avoid declaring the schema, you can create the dataframe with toDF (after importing spark's implicits) to let Spark infer the right type (org.apache.spark.ml.linalg.VectorUDT) for you:

    import org.apache.spark.ml.linalg.Vectors
    import spark.implicits._
    val df = dataRDD.map{ item =>
        val doubleArray = Array(item._1.toDouble, item._2.toDouble, item._3.toDouble)
        val features = Vectors.dense(doubleArray)
        (item._4.toDouble, features)
    }.toDF("label", "features")