Search code examples
pysparkrdd

How to use forEachPartition on pyspark dataframe?


I am trying to use forEachPartition() method using pyspark on a RDD that has 8 partitions. My custom function tries to generate a string output for a given string input. Here is the code

from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
import pandas as pd
import datetime

def compute_sentiment_score(text):
    client = language.LanguageServiceClient()
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.score)

def compute_sentiment_magnitude(text):
    client = language.LanguageServiceClient()
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.magnitude)

import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/path-to-file.json"

imdb_reviews = pd.read_csv('imdb_reviews.csv', header=None, names=['input1', 'input2'], encoding= "ISO-8859-1")

imdb_reviews.head()

    input1                                         input2
0   first think another Disney movie, might good, ...   1
1   Put aside Dr. House repeat missed, Desperate H...   0
2   big fan Stephen King's work, film made even gr...   1
3   watched horrid thing TV. Needless say one movi...   0
4   truly enjoyed film. acting terrific plot. Jeff...   1


spark_imdb_reviews = spark.createDataFrame(imdb_reviews) # create spark dataframe


spark_imdb_reviews.printSchema()
root
 |-- input1: string (nullable = true)
 |-- input2: long (nullable = true)

And this is my custom function -

def compute_sentiment_score(text):
    client = language.LanguageServiceClient()
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.score)

def compute_sentiment_magnitude(text):
    client = language.LanguageServiceClient()
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.magnitude)

Here is how I try to use the forEachPartition() method -

create_rdd = spark_imdb_reviews.select("input1").rdd # create RDD
print(create_rdd.getNumPartitions()) # print the partitions
print(create_rdd.take(1)) # display data
new_rdd = create_rdd.foreachPartition(compute_sentiment_score) # compute score

Which gives this output and an error -

8
[Row(input1="first think another Disney movie, might good, it's kids movie. watch it, can't help enjoy it. ages love movie. first saw movie 10 8 years later still love it! Danny Glover superb could play part better. Christopher Lloyd hilarious perfect part. Tony Danza believable Mel Clark. can't help, enjoy movie! give 10/10!")]

File "<ipython-input-106-e3fd65ce75cc>", line 3, in compute_sentiment_score
TypeError: <itertools.chain object at 0x11ab7f198> has type itertools.chain, but expected one of: bytes, unicode

Solution

  • There are two similar functions:

    Both functions expect another function as parameter (here compute_sentiment_score). This function gets the content of a partition passed in form of an iterator. The text parameter in the question is actually an iterator that can be used inside of compute_sentiment_score.

    The difference between foreachPartition and mapPartition is that foreachPartition is a Spark action while mapPartition is a transformation. This means the code being called by foreachPartition is immediately executed and the RDD remains unchanged while mapPartition can be used to create a new RDD. In order to store the calculated sentiment score mapPartitions should be used.

    def compute_sentiment_score(itr_text):
        #setup the things that are expensive and should be prepared only once per partition
        client = language.LanguageServiceClient()
        
        #run the loop for each row of the partition
        for text in itr_text:
            document = types.Document(content=text.value,type=enums.Document.Type.PLAIN_TEXT, language='en')
            sentiment = client.analyze_sentiment(document=document).document_sentiment
            yield (text.value, sentiment.score)
    
    df_with_score = df.rdd.mapPartitions(compute_sentiment_score)
    df_with_score.foreach(print)
    

    In this example client = language.LanguageServiceClient() is called once per partition. Probably the amount of partitions has to be reduced, for example with coalesce.