I would like to do some NLP analysis for a string column in pyspark dataframe.
df:
year month u_id rating_score p_id review
2010 09 tvwe 1 p_5 I do not like it because its size is not for me.
2011 11 frsa 1 p_7 I am allergic to the peanut elements.
2015 5 ybfd 1 p_2 It is a repeated one, please no more.
2016 7 tbfb 2 p_2 It is not good for my oil hair.
Each p_id represents an item. Each u_id may have some reviews for each item. The review can be several words, one sentence or a paragraph or even emoji.
I would like to find the root reasons that the items are rated low or high. For example, how many "u_id"s complain the issue of item's size, chemical elements allergy or others, which are relevant to the items' features.
From How to iterate over rows in a DataFrame in Pandas, I learn that it is more efficient to transform the dataframe to numpy array then use vectorization to do NLP analysis.
I am trying to use SparkNLP to extract adjectives and noun phrase for each comment by year, month, u_id, p_id.
I am not sure how to apply the numpy vectorization to do this very efficiently.
My py3 code:
from sparknlp.pretrained import PretrainedPipeline
df = spark.sql('select year, month, u_id, p_id, comment from MY_DF where rating_score = 1 and isnull(comment) = false')
import numpy as np
trainseries = df['comment'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
text = np.apply_along_axis(lambda x : x[0], 1, trainseries) # TypeError: 'Column' object is not callable
pipeline_dl = PretrainedPipeline('explain_document_dl', lang='en') #
result = pipeline_dl.fullAnnotate(text)
The code does not work. I also need to keep the other columns (e.g. year, month, u_id, p_id) in the vectorization and assure that the NLP analysis results can be aligned with year, month, u_id, p_id well.
I do not like this How to convert a pyspark dataframe column to numpy array because collect() is too slow.
Thanks
IIUC, you don't need Numpy(Spark handles vectorization internally), just do transform
and then select and filter the proper information from the resulting dataframe:
from sparknlp.pretrained import PretrainedPipeline
df = spark.sql('select year, month, u_id, p_id, comment from MY_DF where rating_score = 1 and isnull(comment) = false')
df1 = df.withColumnRenamed('comment', 'text')
pipeline_dl = PretrainedPipeline('explain_document_dl', lang='en')
result = pipeline_dl.transform(df1)
df_new = result.selectExpr(
*df1.columns,
'transform(filter(pos, p -> p.result rlike "^(?:NN|JJ)"), x -> x.metadata.word) as words'
)
Output:
df_new.show(10,0)
+-----+-----+----+------------+----+------------------------------------------------+----------------------------+
|years|month|u_id|rating_score|p_id|text |words |
+-----+-----+----+------------+----+------------------------------------------------+----------------------------+
|2010 |09 |tvwe|1 |p_5 |I do not like it because its size is not for me.|[size] |
|2011 |11 |frsa|1 |p_7 |I am allergic to the peanut elements. |[allergic, peanut, elements]|
|2015 |5 |ybfd|1 |p_2 |It is a repeated one, please no more. |[more] |
|2016 |7 |tbfb|2 |p_2 |It is not good for my oil hair. |[good, oil, hair] |
+-----+-----+----+------------+----+------------------------------------------------+----------------------------+
Note:
(1) result = pipeline.fullAnnotate(df,'comment')
is a shortcut of renaming comment
to text
and then doing pipeline.transform(df1)
. the first argument of fullAnnotate can be a DataFrame, List or a String.
(2) the list of POS tags from https://www.ling.upenn.edu/courses/Fall_2003/ling001/penn_treebank_pos.html