Search code examples
dataframescalaapache-spark

Spark DataFrame CountVectorizedModel Error With DataType String


I have the following piece of code that tries to perform a simple action where I'm trying to convert from a sparse vector to a dense vector. Here is what I have so far:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder}
import org.apache.spark.ml.feature.CountVectorizerModel
import org.apache.spark.mllib.linalg.Vector
import spark.implicits._

// Identify how many distinct values are in the OCEAN_PROXIMITY column
val distinctOceanProximities = dfRaw.select(col("ocean_proximity")).distinct().as[String].collect()

val cvmDF = new CountVectorizerModel(tags)
  .setInputCol("ocean_proximity")
  .setOutputCol("sparseFeatures")
  .transform(dfRaw)
  
val exprs = (0 until distinctOceanProximities.size).map(i => $"features".getItem(i).alias(s"$distinctOceanProximities(i)"))
val vecToSeq = udf((v: Vector) => v.toArray)

val df2 = cvmDF.withColumn("features", vecToSeq($"sparseFeatures")).select(exprs:_*)
df2.show()

When I ran this script, I get the following error:

java.lang.IllegalArgumentException: requirement failed: Column ocean_proximity must be of type equal to one of the following types: [array<string>, array<string>] but was actually of type string.
  at scala.Predef$.require(Predef.scala:281)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnTypes(SchemaUtils.scala:63)
  at org.apache.spark.ml.feature.CountVectorizerParams.validateAndTransformSchema(CountVectorizer.scala:97)
  at org.apache.spark.ml.feature.CountVectorizerParams.validateAndTransformSchema$(CountVectorizer.scala:95)
  at org.apache.spark.ml.feature.CountVectorizerModel.validateAndTransformSchema(CountVectorizer.scala:272)
  at org.apache.spark.ml.feature.CountVectorizerModel.transformSchema(CountVectorizer.scala:338)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
  at org.apache.spark.ml.feature.CountVectorizerModel.transform(CountVectorizer.scala:306)
  ... 101 elided

I think it is expecting a Seq of String for the datatype but I have just a String. Any ideas how to fix this?


Solution

  • It was pretty simple. All I had to do is to convert the column from String to an Array of String just like this:

    val oceanProximityAsArrayDF = dfRaw.withColumn("ocean_proximity", array("ocean_proximity"))