My goal is to clean the Data in a column in a Pyspark DF. I have written a function for cleaning .
def preprocess(text):
text = text.lower()
text=text.strip()
text=re.compile('<.*?>').sub('', text)
text = re.compile('[%s]' % re.escape(string.punctuation)).sub(' ', text)
text = re.sub('\s+', ' ', text)
text = re.sub(r'\[[0-9]*\]',' ',text)
text=re.sub(r'[^\w\s]', '', text.lower().strip())
text = re.sub(r'\d',' ',text)
text = re.sub(r'\s+',' ',text)
return text
#LEMMATIZATION
# Initialize the lemmatizer
wl = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))
def remove_stopwords(text):
text = [i for i in text.split() if not i in stop_words]
return text
# This is a helper function to map NTLK position tags
def get_wordnet_pos(tag):
if tag.startswith('J'):
return wordnet.ADJ
elif tag.startswith('V'):
return wordnet.VERB
elif tag.startswith('N'):
return wordnet.NOUN
elif tag.startswith('R'):
return wordnet.ADV
else:
return wordnet.NOUN
# Tokenize the sentence
def lemmatizer(string):
word_pos_tags = nltk.pos_tag(word_tokenize(string)) # Get position tags
a=[wl.lemmatize(tag[0], get_wordnet_pos(tag[1])) for idx, tag in enumerate(word_pos_tags)] # Map the position tag and lemmatize the word/token
return " ".join(a)
#Final Function
def finalpreprocess(string):
return lemmatizer(' '.join(remove_stopwords(preprocess(string))))
The functions seems to work fine when I test it . When I do
text = 'Ram and Bheem are buddies. They (both) like <b>running</b>. They got better at it over the weekend'
print(finalpreprocess(text))
I see the exact result I want.
ram bheem buddy like run get well weekend
How ever when I try to apply this function finalpreprocess() to a column in pyspark dataframe . I am getting errors. Here is what I did.
udf_txt_clean = udf(lambda x: finalpreprocess(x),StringType()) df.withColumn("cleaned_text",lem(col("reason"))).select("reason","cleaned_text").show(10,False)
Then I am getting the error :
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 473, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.RLock' object
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
So far here is what I did. In my finalpreprocess(), I am using three different functions preprocess(),remove_stopwords(), lemmatizer() . I changed my udf_txt_clean accordingly . Like
udf_txt_clean = udf(lambda x: preprocess(x),StringType())
udf_txt_clean = udf(lambda x: remove_stopwords(x),StringType())
These two run fine But -
udf_txt_clean = udf(lambda x: lemmatizer (x),StringType())
is the one that is giving me the error. I am not able to understand why this function is giving the error but not the other two. From my limited understating I see that its having trouble trying to pickle this function but I am not able to understand why its trying to pickle this in the first place or if there is a work around for it.
It would help if the example were more reproducible next time. It took a bit to re-create this. No worries, though,I have a solution here.
First, cloudpickle
is the mechanism of Spark to move a function from drivers to workers. So functions are pickled and then sent to the workers for execution. So something you are using can't be pickled. In order to quickly test, you can just use:
import cloudpickle
cloudpickle.dumps(x)
where x is something that you are testing if it's cloudpickle-able. In this case, I tried a couple of times and found wordnet
not to be serializable. You can test with:
cloudpickle.dumps(wordnet)
and it will reproduce the issue. You can get around this by importing the stuff that can't be pickled inside your function. Here is an end-to-end example for you.
import re
import pandas as pd
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
import string
from nltk.tokenize import word_tokenize
from nltk.corpus import wordnet
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType,IntegerType,StringType
def preprocess(text):
text = text.lower()
text=text.strip()
text=re.compile('<.*?>').sub('', text)
text = re.compile('[%s]' % re.escape(string.punctuation)).sub(' ', text)
text = re.sub('\s+', ' ', text)
text = re.sub(r'\[[0-9]*\]',' ',text)
text=re.sub(r'[^\w\s]', '', text.lower().strip())
text = re.sub(r'\d',' ',text)
text = re.sub(r'\s+',' ',text)
return text
#LEMMATIZATION
# Initialize the lemmatizer
wl = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))
def remove_stopwords(text):
text = [i for i in text.split() if not i in stop_words]
return text
def lemmatizer(string):
from nltk.corpus import wordnet
def get_wordnet_pos(tag):
if tag.startswith('J'):
return wordnet.ADJ
elif tag.startswith('V'):
return wordnet.VERB
elif tag.startswith('N'):
return wordnet.NOUN
elif tag.startswith('R'):
return wordnet.ADV
else:
return wordnet.NOUN
word_pos_tags = nltk.pos_tag(word_tokenize(string)) # Get position tags
a=[wl.lemmatize(tag[0], get_wordnet_pos(tag[1])) for idx, tag in enumerate(word_pos_tags)] # Map the position tag and lemmatize the word/token
return " ".join(a)
#Final Function
def finalpreprocess(string):
return lemmatizer(' '.join(remove_stopwords(preprocess(string))))
spark = SparkSession.builder.getOrCreate()
text = 'Ram and Bheem are buddies. They (both) like <b>running</b>. They got better at it over the weekend'
test = pd.DataFrame({"test": [text]})
sdf = spark.createDataFrame(test)
udf_txt_clean = udf(lambda x: finalpreprocess(x),StringType())
sdf.withColumn("cleaned_text",udf_txt_clean(col("test"))).select("test","cleaned_text").show(10,False)