I am trying to use forEachPartition() method using pyspark on a RDD that has 8 partitions. My custom function tries to generate a string output for a given string input. Here is the code
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
import pandas as pd
import datetime
def compute_sentiment_score(text):
client = language.LanguageServiceClient()
document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
return str(sentiment.score)
def compute_sentiment_magnitude(text):
client = language.LanguageServiceClient()
document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
return str(sentiment.magnitude)
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/path-to-file.json"
imdb_reviews = pd.read_csv('imdb_reviews.csv', header=None, names=['input1', 'input2'], encoding= "ISO-8859-1")
imdb_reviews.head()
input1 input2
0 first think another Disney movie, might good, ... 1
1 Put aside Dr. House repeat missed, Desperate H... 0
2 big fan Stephen King's work, film made even gr... 1
3 watched horrid thing TV. Needless say one movi... 0
4 truly enjoyed film. acting terrific plot. Jeff... 1
spark_imdb_reviews = spark.createDataFrame(imdb_reviews) # create spark dataframe
spark_imdb_reviews.printSchema()
root
|-- input1: string (nullable = true)
|-- input2: long (nullable = true)
And this is my custom function -
def compute_sentiment_score(text):
client = language.LanguageServiceClient()
document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
return str(sentiment.score)
def compute_sentiment_magnitude(text):
client = language.LanguageServiceClient()
document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
return str(sentiment.magnitude)
Here is how I try to use the forEachPartition() method -
create_rdd = spark_imdb_reviews.select("input1").rdd # create RDD
print(create_rdd.getNumPartitions()) # print the partitions
print(create_rdd.take(1)) # display data
new_rdd = create_rdd.foreachPartition(compute_sentiment_score) # compute score
Which gives this output and an error -
8
[Row(input1="first think another Disney movie, might good, it's kids movie. watch it, can't help enjoy it. ages love movie. first saw movie 10 8 years later still love it! Danny Glover superb could play part better. Christopher Lloyd hilarious perfect part. Tony Danza believable Mel Clark. can't help, enjoy movie! give 10/10!")]
File "<ipython-input-106-e3fd65ce75cc>", line 3, in compute_sentiment_score
TypeError: <itertools.chain object at 0x11ab7f198> has type itertools.chain, but expected one of: bytes, unicode
There are two similar functions:
Both functions expect another function as parameter (here compute_sentiment_score
). This function gets the content of a partition passed in form of an iterator. The text
parameter in the question is actually an iterator that can be used inside of compute_sentiment_score
.
The difference between foreachPartition
and mapPartition
is that foreachPartition
is a Spark action while mapPartition
is a transformation. This means the code being called by foreachPartition
is immediately executed and the RDD remains unchanged while mapPartition
can be used to create a new RDD. In order to store the calculated sentiment score mapPartitions
should be used.
def compute_sentiment_score(itr_text):
#setup the things that are expensive and should be prepared only once per partition
client = language.LanguageServiceClient()
#run the loop for each row of the partition
for text in itr_text:
document = types.Document(content=text.value,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
yield (text.value, sentiment.score)
df_with_score = df.rdd.mapPartitions(compute_sentiment_score)
df_with_score.foreach(print)
In this example client = language.LanguageServiceClient()
is called once per partition. Probably the amount of partitions has to be reduced, for example with coalesce.