I'm trying the following code:
import pandas as pd
from pymorphy2 import MorphAnalyzer
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("udf").getOrCreate()
def gender(s):
m = MorphAnalyzer()
return m.parse(s)[0].tag.gender
gen = F.udf(gender, T.StringType())
df = spark.createDataFrame(pd.DataFrame({"name": ["кирилл", "вавила"]}))
df.select(gen("name").alias("gender")).show()
and more or less expectedly getting the following error message:
ERROR Executor: Exception in task 2.0 in stage 29.0 (TID 151)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.cloudpickle.cloudpickle._make_skeleton_class). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
What could be the easiest way to circumvent the error (if any) ?
You could use pandas_udf
which is vectorized (more efficient compared to a regular udf
).
import pandas as pd
from pymorphy2 import MorphAnalyzer
from pyspark.sql import SparkSession, types as T, functions as F
spark = SparkSession.builder.appName("udf").getOrCreate()
@F.pandas_udf(T.StringType())
def gender(s: pd.Series) -> pd.Series:
return s.apply(lambda x: MorphAnalyzer().parse(x)[0].tag.gender)
df = spark.createDataFrame(pd.DataFrame({"name": ["кирилл", "вавила", "софия"]}))
df.withColumn("gender", gender("name")).show()
# +------+------+
# | name|gender|
# +------+------+
# |кирилл| masc|
# |вавила| masc|
# | софия| femn|
# +------+------+
In some systems, you may not need pyspark.sql.types
library solely for pandas_udf
, the function could be defined like this:
@F.pandas_udf('string')
def gender(s: pd.Series) -> pd.Series:
return s.apply(lambda x: MorphAnalyzer().parse(x)[0].tag.gender)
Update: You could also use Iterator
class in order to create MorphAnalyzer
object once for every batch.
import pandas as pd
from pymorphy2 import MorphAnalyzer
from pyspark.sql import SparkSession, types as T, functions as F
from typing import Iterator
spark = SparkSession.builder.appName("udf").getOrCreate()
@F.pandas_udf(T.StringType())
def gender(s: Iterator[pd.Series]) -> Iterator[pd.Series]:
m = MorphAnalyzer()
for e in s:
yield e.apply(lambda x: m.parse(x)[0].tag.gender)