Search code examples
pythonpandasapache-sparkpysparkpickle

Using external library in PySpark UDF pickle error


I'm trying the following code:

import pandas as pd
from pymorphy2 import MorphAnalyzer
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("udf").getOrCreate()

def gender(s):
    m = MorphAnalyzer()
    return m.parse(s)[0].tag.gender

gen = F.udf(gender, T.StringType())

df = spark.createDataFrame(pd.DataFrame({"name": ["кирилл", "вавила"]}))

df.select(gen("name").alias("gender")).show()

and more or less expectedly getting the following error message:

ERROR Executor: Exception in task 2.0 in stage 29.0 (TID 151)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.cloudpickle.cloudpickle._make_skeleton_class). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)

What could be the easiest way to circumvent the error (if any) ?


Solution

  • You could use pandas_udf which is vectorized (more efficient compared to a regular udf).

    import pandas as pd
    from pymorphy2 import MorphAnalyzer
    from pyspark.sql import SparkSession, types as T, functions as F
    
    spark = SparkSession.builder.appName("udf").getOrCreate()
    
    @F.pandas_udf(T.StringType())
    def gender(s: pd.Series) -> pd.Series:
        return s.apply(lambda x: MorphAnalyzer().parse(x)[0].tag.gender)
    
    df = spark.createDataFrame(pd.DataFrame({"name": ["кирилл", "вавила", "софия"]}))
    df.withColumn("gender", gender("name")).show()
    # +------+------+
    # |  name|gender|
    # +------+------+
    # |кирилл|  masc|
    # |вавила|  masc|
    # | софия|  femn|
    # +------+------+
    

    In some systems, you may not need pyspark.sql.types library solely for pandas_udf, the function could be defined like this:

    @F.pandas_udf('string')
    def gender(s: pd.Series) -> pd.Series:
        return s.apply(lambda x: MorphAnalyzer().parse(x)[0].tag.gender)
    

    Update: You could also use Iterator class in order to create MorphAnalyzer object once for every batch.

    import pandas as pd
    from pymorphy2 import MorphAnalyzer
    from pyspark.sql import SparkSession, types as T, functions as F
    from typing import Iterator
    
    spark = SparkSession.builder.appName("udf").getOrCreate()
    
    @F.pandas_udf(T.StringType())
    def gender(s: Iterator[pd.Series]) -> Iterator[pd.Series]:
        m = MorphAnalyzer()
        for e in s:
            yield e.apply(lambda x: m.parse(x)[0].tag.gender)