My original schema contains a lot of maptypes that I want to use in an ML model so I need to convert them into SparkML sparse vectors.
root
|-- colA: map (nullable = true)
| |-- key: string
| |-- value: double (valueContainsNull = true)
|-- colB: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- colC: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Context: SparkML models require the data to be formed as a feature vector. There are some utilities to generates the features vector but none supports the maptype type. e.g. The SparkML VectorAssembler allows to combine several columns (all numeric types, boolean type, or vector type).
Edit:
So far my solution is to explode the map into columns individually then use the VectorAssembler
:
val listkeysColA = df.select(explode($"colA"))
.select($"key").as[Int].distinct.collect.sorted
val exploded= df.select(listkeysColA.map(x =>
$"colA".getItem(x).alias(x.toString)): _*).na.fill(0)
val columnNames = exploded.columns
val assembler = new VectorAssembler().setInputCols(columnNames).setOutputCol("features")
Edit2:
I should add that the data in my maps are very sparse and there is no known set of keys beforehand. That is why in my current solution I do one pass over to data first to collect and sort the keys. Then I access the values using getItem(keyName).
As far as I know, there is no in-built method for this in Spark, so an UDF
would be a suitable solution in this case. Here is one that takes a column with a Map[String, Double]
and returns a ML vector:
val toVector = udf((m: Map[String, Double]) => Vectors.dense(m.values.toArray).toSparse)
Since a Map
does not have an order the resulting vector is not guaranteed to have a certain order either.
Example input (df
):
+---------------------------------+---------------------------------+
|colA |colB |
+---------------------------------+---------------------------------+
|Map(a -> 1.0, b -> 2.0, c -> 3.0)|Map(a -> 1.0, b -> 2.0, c -> 3.0)|
+---------------------------------+---------------------------------+
and using the UDF
,
val df2 = df.withColumn("colA", toVector($"colA")).withColumn("colB", toVector($"colB"))
gives the following output:
+-------------+-------------+
|colA |colB |
+-------------+-------------+
|[1.0,2.0,3.0]|[1.0,2.0,3.0]|
+-------------+-------------+
where both columns are of vector type.
root
|-- colA: vector (nullable = true)
|-- colB: vector (nullable = true)
If you want to merge all the columns together into a single vector, here it would be appropriate to use a VectorAssembler
, as in the question edit.
Edit:
If you need to keep a certain order of the values, then you need to collect all keys first as you have done. You can avoid using explode
however:
val keys = df.select($"colA")
.flatMap(_.getAs[Map[String, Int]]("colA").keys)
.distinct
.collect
.sorted
Then change the UDF
appropriately to take the order of keys
into account with a default value of 0.0:
val toVector = udf((m: Map[String, Double]) =>
Vectors.dense(keys.map(key => m.getOrElse(key, 0.0))).toSparse
)