In my spark DataFrame I have a column which includes the output of a CountVectoriser
transformation - it is in sparse vector format. What I am trying to do is to 'explode' this column again into a dense vector and then it's component rows (so that it can be used for scoring by an external model).
I know there are 40 features in the column, hence Following this example, I have tried:
import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector
// convert sparse vector to a dense vector, and then to array<double>
val vecToSeq = udf((v: Vector) => v.toArray)
// Prepare a list of columns to create
val exprs = (0 until 39).map(i => $"_tmp".getItem(i).alias(s"exploded_col$i"))
testDF.select(vecToSeq($"features").alias("_tmp")).select(exprs:_*)
However, I get the weird error (see full error below):
data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;
Now it appears that maybe the CountVectoriser created a vector of type 'ml.linalg.Vector,' so I have alternatively tried importing:
import org.apache.spark.ml.linalg.{Vector, DenseVector, SparseVector}
And then I get an error Caused by:
Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row
I have also tried converting the ml vector by altering the UDF to:
val vecToSeq = udf((v: Vector) => org.apache.spark.mllib.linalg.Vectors.fromML(v.toDense).toArray )
And get a similar cannot be cast to org.apache.spark.sql.Row
error. Can anyone tell me why this is not working? Is there an easier way to explode a sparse vector in a DataFrame into sperate columns? I've spent hours on this and cannot figure it out.
EDIT: The schema shows the feature column just as a vector:
|-- features: vector (nullable = true)
Full error trace:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features)' due to data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;;
Project [UDF(features#325) AS _tmp#463]
. . .
org.apache.spark.sql.cassandra.CassandraSourceRelation@47eae91d
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
at uk.nominet.renewals.prediction_test$.prediction_test(prediction_test.scala:292)
at
It appears to be an issue with your import statements. As you noticed, CountVectorizer
will use the ml
package vectors, therefore, all vector imports should also use this package. Make sure you do not have any imports using the older mllib
. This include:
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.DenseVector
There are some methods only present in the mllib
package, so in the case you actually need to use this type of vectors, you can rename them (since the name is the same as the ml
vectors otherwise). For example:
import org.apache.spark.mllib.linalg.{Vector => mllibVector}
After fixing all imports, your code should run. Test:
val df = Seq((1L, Seq("word1", "word2", "word3")), (2L, Seq("word2", "word4"))).toDF("id", "words")
val countVec = new CountVectorizer().setInputCol("words").setOutputCol("features")
val testDF = countVec.fit(df).transform(df)
Will give a testing dataframe as follows:
+---+--------------------+--------------------+
| id| words| features|
+---+--------------------+--------------------+
| 1|[word1, word2, wo...|(4,[0,2,3],[1.0,1...|
| 2| [word2, word4]| (4,[0,1],[1.0,1.0])|
+---+--------------------+--------------------+
Now to give each index it's own column:
val vecToSeq = udf((v: Vector) => v.toArray)
val exprs = (0 until 4).map(i => $"features".getItem(i).alias(s"exploded_col$i"))
val df2 = testDF.withColumn("features", vecToSeq($"features")).select(exprs:_*)
Resulting dataFfame:
+-------------+-------------+-------------+-------------+
|exploded_col0|exploded_col1|exploded_col2|exploded_col3|
+-------------+-------------+-------------+-------------+
| 1.0| 0.0| 1.0| 1.0|
| 1.0| 1.0| 0.0| 0.0|
+-------------+-------------+-------------+-------------+