Search code examples
pysparknlpdatabrickslemmatization

Trouble with applying a UDF on a column in Pyspark Dataframe


My goal is to clean the Data in a column in a Pyspark DF. I have written a function for cleaning .

def preprocess(text):
    text = text.lower() 
    text=text.strip()  
    text=re.compile('<.*?>').sub('', text) 
    text = re.compile('[%s]' % re.escape(string.punctuation)).sub(' ', text)  
    text = re.sub('\s+', ' ', text)  
    text = re.sub(r'\[[0-9]*\]',' ',text) 
    text=re.sub(r'[^\w\s]', '', text.lower().strip())
    text = re.sub(r'\d',' ',text) 
    text = re.sub(r'\s+',' ',text) 
    return text

 

#LEMMATIZATION
# Initialize the lemmatizer
wl = WordNetLemmatizer()

stop_words = set(stopwords.words('english'))
def remove_stopwords(text):
    text = [i for i in text.split() if not i in stop_words]
    return text
 
# This is a helper function to map NTLK position tags
def get_wordnet_pos(tag):
    if tag.startswith('J'):
        return wordnet.ADJ
    elif tag.startswith('V'):
        return wordnet.VERB
    elif tag.startswith('N'):
        return wordnet.NOUN
    elif tag.startswith('R'):
        return wordnet.ADV
    else:
        return wordnet.NOUN
# Tokenize the sentence
def lemmatizer(string):
    word_pos_tags = nltk.pos_tag(word_tokenize(string)) # Get position tags
    a=[wl.lemmatize(tag[0], get_wordnet_pos(tag[1])) for idx, tag in enumerate(word_pos_tags)] # Map the position tag and lemmatize the word/token
    return " ".join(a)

#Final Function
def finalpreprocess(string):
    return lemmatizer(' '.join(remove_stopwords(preprocess(string))))

The functions seems to work fine when I test it . When I do

text = 'Ram and Bheem are buddies. They (both) like <b>running</b>. They got better at it over the weekend'

print(finalpreprocess(text))

I see the exact result I want.

ram bheem buddy like run get well weekend

How ever when I try to apply this function finalpreprocess() to a column in pyspark dataframe . I am getting errors. Here is what I did.

udf_txt_clean = udf(lambda x: finalpreprocess(x),StringType()) df.withColumn("cleaned_text",lem(col("reason"))).select("reason","cleaned_text").show(10,False)

Then I am getting the error :

Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 473, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.RLock' object
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

So far here is what I did. In my finalpreprocess(), I am using three different functions preprocess(),remove_stopwords(), lemmatizer() . I changed my udf_txt_clean accordingly . Like

udf_txt_clean = udf(lambda x: preprocess(x),StringType())
udf_txt_clean = udf(lambda x: remove_stopwords(x),StringType())

These two run fine But -

udf_txt_clean = udf(lambda x: lemmatizer (x),StringType())

is the one that is giving me the error. I am not able to understand why this function is giving the error but not the other two. From my limited understating I see that its having trouble trying to pickle this function but I am not able to understand why its trying to pickle this in the first place or if there is a work around for it.


Solution

  • It would help if the example were more reproducible next time. It took a bit to re-create this. No worries, though,I have a solution here.

    First, cloudpickle is the mechanism of Spark to move a function from drivers to workers. So functions are pickled and then sent to the workers for execution. So something you are using can't be pickled. In order to quickly test, you can just use:

    import cloudpickle
    cloudpickle.dumps(x)
    

    where x is something that you are testing if it's cloudpickle-able. In this case, I tried a couple of times and found wordnet not to be serializable. You can test with:

    cloudpickle.dumps(wordnet)
    

    and it will reproduce the issue. You can get around this by importing the stuff that can't be pickled inside your function. Here is an end-to-end example for you.

    import re
    import pandas as pd
    from nltk.stem import WordNetLemmatizer
    from nltk.corpus import stopwords
    import string
    from nltk.tokenize import word_tokenize
    from nltk.corpus import wordnet
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.functions import udf
    from pyspark.sql.types import ArrayType,IntegerType,StringType
    
    def preprocess(text):
        text = text.lower() 
        text=text.strip()  
        text=re.compile('<.*?>').sub('', text) 
        text = re.compile('[%s]' % re.escape(string.punctuation)).sub(' ', text)  
        text = re.sub('\s+', ' ', text)  
        text = re.sub(r'\[[0-9]*\]',' ',text) 
        text=re.sub(r'[^\w\s]', '', text.lower().strip())
        text = re.sub(r'\d',' ',text) 
        text = re.sub(r'\s+',' ',text) 
        return text
    
    
    #LEMMATIZATION
    # Initialize the lemmatizer
    wl = WordNetLemmatizer()
    
    stop_words = set(stopwords.words('english'))
    def remove_stopwords(text):
        text = [i for i in text.split() if not i in stop_words]
        return text
     
    def lemmatizer(string):
        from nltk.corpus import wordnet
        def get_wordnet_pos(tag):
            if tag.startswith('J'):
                return wordnet.ADJ
            elif tag.startswith('V'):
                return wordnet.VERB
            elif tag.startswith('N'):
                return wordnet.NOUN
            elif tag.startswith('R'):
                return wordnet.ADV
            else:
                return wordnet.NOUN
        word_pos_tags = nltk.pos_tag(word_tokenize(string)) # Get position tags
        a=[wl.lemmatize(tag[0], get_wordnet_pos(tag[1])) for idx, tag in enumerate(word_pos_tags)] # Map the position tag and lemmatize the word/token
        return " ".join(a)
    
    #Final Function
    def finalpreprocess(string):
        return lemmatizer(' '.join(remove_stopwords(preprocess(string))))
    
    spark = SparkSession.builder.getOrCreate()
    text = 'Ram and Bheem are buddies. They (both) like <b>running</b>. They got better at it over the weekend'
    test = pd.DataFrame({"test": [text]})
    sdf = spark.createDataFrame(test)
    udf_txt_clean = udf(lambda x: finalpreprocess(x),StringType())
    sdf.withColumn("cleaned_text",udf_txt_clean(col("test"))).select("test","cleaned_text").show(10,False)