Search code examples
apache-sparkapache-spark-sqlapache-spark-ml

Spark pipeline combining VectorAssembler and HashingTF transformers


Let's define a Spark pipeline that assembles a few columns together and then applies feature hashing:

val df = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0))).toDF("colx", "coly", "colz")
val va = new VectorAssembler().setInputCols(Array("colx", "coly", "colz")).setOutputCol("ft")
val hashIt = new HashingTF().setInputCol("ft").setOutputCol("ft2")
val pipeline = new Pipeline().setStages(Array(va, hashIt))

Fitting the pipeline with pipeline.fit(df) throws:

java.lang.IllegalArgumentException: requirement failed: The input column must be ArrayType, but got org.apache.spark.mllib.linalg.VectorUDT@f71b0bce

Is there a transformer that will allow VectorAssembler and HashingTF to be able to work together?


Solution

  • Personally, I won't even use the Pipeline API for this purpose, an array function should be enough

    val df = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0)))
                   .toDF("colx", "coly", "colz")
                   .withColumn("ft", array('colx, 'coly, 'colz))
    
    val hashIt = new HashingTF().setInputCol("ft").setOutputCol("ft2")
    val res = hashIt.transform(df)
    
    res.show(false)
    # +----+----+----+---------------+------------------------------+
    # |colx|coly|colz|ft             |ft2                           |
    # +----+----+----+---------------+------------------------------+
    # |0.0 |1.0 |2.0 |[0.0, 1.0, 2.0]|(262144,[0,1,2],[1.0,1.0,1.0])|
    # |3.0 |4.0 |5.0 |[3.0, 4.0, 5.0]|(262144,[3,4,5],[1.0,1.0,1.0])|
    # +----+----+----+---------------+------------------------------+
    

    As a follow-up for the question, to generalize the application of the array function in case of the number of columns > 3, the following step concatenate all the columns into one column with an array of all the needed columns :

    val df2 = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0)))
                    .toDF("colx", "coly", "colz")
    val cols = (for (i <- df2.columns) yield df2(i)).toList
    df2.withColumn("ft",array(cols :_*)).show
    
    # +----+----+----+---------------+
    # |colx|coly|colz|             ft|
    # +----+----+----+---------------+
    # | 0.0| 1.0| 2.0|[0.0, 1.0, 2.0]|
    # | 3.0| 4.0| 5.0|[3.0, 4.0, 5.0]|
    # +----+----+----+---------------+