Search code examples
scalaapache-sparkapache-spark-sqlapache-spark-mllibapache-spark-ml

OneHotEncoder in Spark Dataframe in Pipeline


I've been trying to get an example running in Spark and Scala with the adult dataset .

Using Scala 2.11.8 and Spark 1.6.1.

The problem (for now) lies in the amount of categorical features in that dataset that all need to be encoded to numbers before a Spark ML algorithm can do its job..

So far I have this:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Adult {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Adult example").setMaster("local[*]")
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)

    val data = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true") // Use first line of all files as header
      .option("inferSchema", "true") // Automatically infer data types
      .load("src/main/resources/adult.data")

    val categoricals = data.dtypes filter (_._2 == "StringType")
    val encoders = categoricals map (cat => new OneHotEncoder().setInputCol(cat._1).setOutputCol(cat._1 + "_encoded"))
    val features = data.dtypes filterNot (_._1 == "label") map (tuple => if(tuple._2 == "StringType") tuple._1 + "_encoded" else tuple._1)

    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.01)
    val pipeline = new Pipeline()
      .setStages(encoders ++ Array(lr))

    val model = pipeline.fit(training)
  }
}

However, this doesn't work. Calling pipeline.fit still contains the original string features and thus throws an exception. How can I remove these "StringType" columns in a pipeline? Or maybe I'm doing it completely wrong, so if someone has a different suggestion I'm happy to all input :).

The reason why I choose to follow this flow is because I have an extensive background in Python and Pandas, but am trying to learn both Scala and Spark.


Solution

  • There is one thing that can be rather confusing here if you're used to higher level frameworks. You have to index the features before you can use encoder. As it is explained in the API docs:

    one-hot encoder (...) maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index.

    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder}
    
    val df = Seq((1L, "foo"), (2L, "bar")).toDF("id", "x")
    
    val categoricals = df.dtypes.filter (_._2 == "StringType") map (_._1)
    
    val indexers = categoricals.map (
      c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx")
    )
    
    val encoders = categoricals.map (
      c => new OneHotEncoder().setInputCol(s"${c}_idx").setOutputCol(s"${c}_enc")
    )
    
    val pipeline = new Pipeline().setStages(indexers ++ encoders)
    
    val transformed = pipeline.fit(df).transform(df)
    transformed.show
    
    // +---+---+-----+-------------+
    // | id|  x|x_idx|        x_enc|
    // +---+---+-----+-------------+
    // |  1|foo|  1.0|    (1,[],[])|
    // |  2|bar|  0.0|(1,[0],[1.0])|
    // +---+---+-----+-------------+
    

    As you can see there is no need to drop string columns from the pipeline. In practice OneHotEncoder will accept numeric column with NominalAttribute, BinaryAttribute or missing type attribute.