Search code examples
scalaapache-sparkapache-spark-ml

One-hot encoding multiple variables with Spark 2.1.1


I'm required to use Spark 2.1.1 and have a simple ML use case where I fit a logistic regression to perform a classification based on both continuous and categorical variables.

I automatically detect categorical variables and index them in the ML pipeline. However when I then try to apply one-hot encoding to each of my variables (the oneHotEncodersStages value in the code below), it results in an error when creating the pipeline:

Error:(48, 118) type mismatch; found : Array[java.io.Serializable] required: Array[_ <: org.apache.spark.ml.PipelineStage] Note: java.io.Serializable >: org.apache.spark.ml.PipelineStage, but class Array is invariant in type T. You may wish to investigate a wildcard type such as _ >: org.apache.spark.ml.PipelineStage. (SLS 3.2.10)
val pipeline = new Pipeline().setStages(stringIndexerStages :+ oneHotEncodersStages :+ indexer :+ assembler :+ lr :+ indexToLabel)

I do not find how to solve this error... any tips? Below is a minimum working example

  import spark.implicits._
  val df = Seq(
    ("automatic","Honda",200,"Cheap"),
    ("semi-automatic","Ford",240,"Expensive")
  ).toDF("cat_type","cat_brand","Speed","label")

  def onlyFeatureCols(c: String): Boolean = !(c matches "id|label") // Function to select only feature columns (omit id and label)
  def isCateg(c: String): Boolean = c.startsWith("cat")
  def categNewCol(c: String): String = if (isCateg(c)) s"idx_${c}" else c
  def isIdx(c: String): Boolean = c.startsWith("idx")
  def oneHotNewCol(c: String): String = if (isIdx(c)) s"vec_${c}" else c

  val featuresNames = df.columns
    .filter(onlyFeatureCols)
    .map(categNewCol)

  val stringIndexerStages = df.columns.filter(isCateg)
    .map(c => new StringIndexer()
      .setInputCol(c)
      .setOutputCol(categNewCol(c))
      .fit(df.select(c))
    )

  val oneHotEncodersStages = df.columns.filter(isIdx)
    .map(c => new OneHotEncoder()
      .setInputCol(c)
      .setOutputCol(oneHotNewCol(c)))

  val indexer = new StringIndexer().setInputCol("label").setOutputCol("labels").fit(df)
  val indexToLabel = new IndexToString().setInputCol("prediction").setOutputCol("predicted_label").setLabels(indexer.labels)
  val assembler = new VectorAssembler().setInputCols(featuresNames).setOutputCol("features")
  val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("labels")

  val pipeline = new Pipeline().setStages(stringIndexerStages :+ oneHotEncodersStages ++ indexer :+  assembler :+ lr :+ indexToLabel)

Solution

  • stringIndexerStages and oneHotEncodersStages are arrays. stringIndexerStages :+ oneHotEncodersStages - creating new array where second array using as single element. Using "++" instead of ":+":

    val pipeline = new Pipeline().setStages(stringIndexerStages ++ oneHotEncodersStages :+ indexer :+  assembler :+ lr :+ indexToLabel)