Search code examples
pythonapache-sparkpysparkapache-spark-sqlapache-spark-ml

Problem applying UDF cosine similarity to grouped ML vectors in Pyspark


I'm having an error when applying an UDF (dot_group) to grouped data. This UDF has the aim to compute pair-wise cosine similarities among the ML Vector of each group made from the features column. The groups are made according to the prediction column of the input data (cdf). The result should be an ArrayType, where each item is a resultant similarity, writen to the cosines column. This is my attempt to do it:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors
from itertools import combinations
from numpy import linalg as LA


def g_dot(M):
    combs = combinations(M, 2)
    return [i.dot(j) /(LA.norm(i) * LA.norm(j)) \
                                            for i, j in combs]
dot_group = F.udf(g_dot, ArrayType(DoubleType()))


cdf = spark.createDataFrame(
            [(1.0, Vectors.dense([0.0, 10.0, 0.5])), 
             (0.0, Vectors.dense([0.0, 1.0, 0.5])),
             (1.0, Vectors.dense([0.0, 10.0, 0.1])),
             (0.0, Vectors.dense([10.0, 10.0, 0.5])),
             (1.0, Vectors.dense([0.0, 0.0, 0.5])),],
            ["prediction", "features"])

dfs = cdf.groupBy(["prediction"]) \
         .agg(F.collect_list("features").alias("data")) \
         .withColumn("cosines", dot_group("data"))
dfs.show()

... Which gives the following error. I'm not sure why this error rises, but it seems that there are problems serializing numpy operations:

19/02/19 16:21:39 ERROR Executor: Exception in task 0.0 in stage 2093.0 (TID 1185)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:156)
        at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:155)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
19/02/19 16:21:39 WARN TaskSetManager: Lost task 0.0 in stage 2093.0 (TID 1185, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)

...
19/02/19 16:21:39 ERROR TaskSetManager: Task 0 in stage 2093.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Aplication/spark/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 336, in show
    print(self._jdf.showString(n, 20))
  File "/Aplication/spark/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/Aplication/spark/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Aplication/spark/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2000.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2093.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2093.0 (TID 1185, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:156)
        at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:155)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Solution

  • That's because Spark SQL doesn't support NumPy types. You should convert values to float before returning

    @F.udf(ArrayType(DoubleType()))
    def dot_group(M):
        combs = combinations(M, 2)
        return [
            # or float(i.dot(j) / (LA.norm(i) * LA.norm(j)))
            (i.dot(j) / (LA.norm(i) * LA.norm(j))).tolist()
            for i, j in combs
        ]