Hi I have the following dataset column:
+-----------------------+
|hashes |
+-----------------------+
|[[-7.0], [0.0], [5.0]] |
|[[-8.0], [1.0], [1.0]] |
|[[-6.0], [1.0], [1.0]] |
+-----------------------+
which has been generated by:
val brp = new BucketedRandomProjectionLSH().
setBucketLength(2).
setNumHashTables(3).
setInputCol("features").
setOutputCol("hashes")
val model = brp.fit(dfVa)
val dfHash = model.transform(dfVa)
With the following schema:
|-- hashes: array (nullable = true)
| |-- element: vector (containsNull = true)
I'd like to make a cross join to another dataset with the same column and calculate the euclidean distance with a UDF I made:
val euclideanDistance = udf { (v1: Vector, v2: Vector) =>
sqrt(Vectors.sqdist(v1, v2))
}
cookesWb
.join(cookesNext)
.withColumn("Distance", euclideanDistance(
cookesWb.col("hashes"),
broadcast(cookesNext.col("hashes"))
))
.filter(col("Distance").lt(80))
However I get the following error:
cannot resolve 'UDF(hashes, hashes)' due to data type mismatch: argument 1 requires vector type, however, '`hashes`' is of array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>
Do you know how to convert that messy type to a Vector in order to let me execute the function?
Thanks.
Here, you have an Array of sparkML vectors. To be able to use your UDF, you first need to convert that to a vector. We can define another UDF for this.
import scala.collection.mutable.WrappedArray
import org.apache.spark.ml.linalg.{Vector, Vectors}
val toVect = udf { (x : WrappedArray[Vector]) =>
// we flatten the array of vectors
val flatArray : Array[Double] = x.flatMap(_.toArray).toArray
Vectors.dense(flatArray)
}
NB: Array[Vector]
would not work here. When you manipulate arrays in spark and use UDFs, WrappedArray
is the type you need to use.
Then you can just perform your crossJoin
like this for instance:
df
.crossJoin(df2)
.withColumn("d", euclideanDistance(toVect(df.col("hashes")),
toVect(df2.col("hashes"))))
.show()