Search code examples
apache-sparkpysparkapache-spark-sql

Perform NLTK in pyspark


I am very new in pyspark and I have developed a program to perform NLTK on HDFS file, The following are the steps for that.I'm using spark 2.3.1

1. Get file from HDFS

2. perform Lemmatization

3. Remove punctuation mark.

4. Convert RDD to DataFrame

5. Perform Tokenizer

6. Remove Stop words

7. Explode columns data to create a unique row for each record

8. I want to keep all files data into a single file so I am merging the output with old fil

9. Now write this entire merged output into HDFS

10. Then deleting old file and renaming spark created file to different name

11. I am doing this for all bigram and trigram files.

Here is my pyspark code.

%pyspark

import os
import pyspark
import csv
import nltk
import json
import string
import re

from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark import SparkContext, SparkConf as sc
from pyspark.sql.types import StringType

from nltk.corpus import stopwords
nltk.download('stopwords')

from pyspark.sql import SQLContext
from pyspark.sql.functions import explode,regexp_replace

import pandas
import hdfs



nltk.download('punkt')
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')


from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName("PySpark App")
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

hdfs_dst_dir = "/user/zeppelin/achyuttest.csv/"
counter=0

#Lemmatizen

def lemma(x):

    lemmatizer = WordNetLemmatizer()
    return lemmatizer.lemmatize(x)



for i in range(1,50001):
    data = sc.textFile('hdfs:///user/spark/Patentdata/ElectronicsPatents/Link\ {}/abstract.txt'.format(i), use_unicode=False)

    print(type(data))

    if data.isEmpty():
        continue


    else:
        lem_words = data.map(lemma)


        list_punct=list(string.punctuation)


        len_list = lem_words.collect()


        test_str = len_list[0]
        test_df = test_str.split(' ')


        data_df = data.map(lambda x: (x, )).toDF(['lem_words'])






# Perform Tokenizer

        tokenizer =  Tokenizer(inputCol="lem_words", outputCol="tokenized_data")
        outputdata = tokenizer.transform(data_df)
        outputdata = outputdata.select('tokenized_data')




    # Remove stop words

        remover = StopWordsRemover(inputCol='tokenized_data', outputCol='words_clean')
        outputdata = remover.transform(outputdata).select('words_clean')


#Explode one Row into multiple Row with value

        result_df = outputdata.withColumn("exploded", explode("words_clean")).select("exploded")

        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))



        print("Link  ========>",i)
#Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/unigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(['words_clean'])


            result_df = old_data_df.union(result_df)

        else:
            pass

#Write DataFrame to HDFS

        result_df.coalesce(1).write.mode('append').csv(hdfs_dst_dir)

        fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

    # Rename file

    #list files in the directory


        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))


    #filter name of the file starts with part-

        print("Get FileName")
        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

        print(file_name)
    #rename the file


        new_filename = "unigram.csv"

    # Remove Old file

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))



## Bigrams

        bigram = NGram(n=2, inputCol="words_clean", outputCol="bigrams")

        bigramDataFrame = bigram.transform(outputdata)




    #Explode one Row into multiple Row with value

        result_df = bigramDataFrame.withColumn("exploded", explode("bigrams")).select("exploded")
        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))


    #Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/bigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])


            result_df = old_data_df.union(result_df)

        else:
            pass


    # Write Output in file

        result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')

    # Rename file

    #list files in the directory

        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))

    #filter name of the file starts with part-

        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

    #rename the file

        new_filename = "bigram.csv"

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))





## TriGram

        trigram = NGram(n=3, inputCol="words_clean", outputCol="trigrams")

        trigramDataFrame = trigram.transform(outputdata)


    #Explode one Row into multiple Row with value

        result_df = trigramDataFrame.withColumn("exploded", explode("trigrams")).select("exploded")
        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))

    #Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/trigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])


            result_df = old_data_df.union(result_df)

        else:
            pass


#Save DataFrame in HDFS
        result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')

    # Rename file

    #list files in the directory

        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))

    #filter name of the file starts with part-

        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

    #rename the file

        new_filename = "trigram.csv"

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))

        counter = counter+1

I am performing this code on 50K files, and my spark is taking too much time to perform this program. (Passed 2 days and still going ...)

I'm running HDP in Virtual machine(running one node HDP Sandbox)Here is my system specification...

====> Guest OS::

  1. Memory: 12930 MB

  2. CPU: 6CPUs

===> YARN Specifications::

1.Memory: 4608 MB

  1. Maximum Container memory: 4608 MB

  2. Maximum Container size(Vcores): 4

  3. Number of virtual core: 4

===> Zeppelin Pyspark Interpreter Specification:: 1. spark.executor.memory: Blank (it's mean 1g as per specified in the documentation)

So I have two questions.

  1. Is my code proper or not?
  2. Which value I have to specify in YARN and Zeppelin Interpreter so it will work fast and efficiently.

Thank you.


Solution

  • I'm answering my first question.

    According to the old code, I was making an RDD for each file located in folder, So It was taking too much time (To process 3K files it was taking 19 hr.)

    But Now What I have done is to Read all input files in Single RDD operation, and perform all operations on it. (Now New code is taking ~15 min to process 3K files.)


    Final Code

    Comments are used for extra understanding

    Patentdetect-local.py

    """
    To Run this code
    Set Pyspark_python 
    
    $ export PYSPARK_PYTHON=/usr/bin/python3
    $ pip install nltk
    
    
    RUN ON Spark::
    
    $ ./bin/spark-submit file_path/Patentdetect-local.py
    """
    
    
    
    import pyspark
    import nltk
    import string
    import os
    import re
    
    from pyspark import SparkContext
    from nltk.stem import WordNetLemmatizer
    
    from pyspark.ml.feature import NGram
    from pyspark.sql.types import ArrayType,StructType,StructField,StringType
    from pyspark.sql.functions import explode,array,split,collect_list
    from pyspark.sql.window import Window
    from pyspark.sql import SparkSession
    
    
    sc = SparkContext.getOrCreate()
    
    spark = SparkSession.builder.appName('Spark Example').getOrCreate()
    
    
    Source_path="<path>/*/abstract.txt"
    
    Destination_path="<path>/spark-outputs/parquet/Electronics-50/"
    
    
    
    data=sc.textFile(Source_path)
    
    
    data.persist()
    lower_casetext = data.map(lambda x:x.lower())
    
    
    
    # splitting_rdd = lower_casetext.map(lambda x:x.split(" "))
    # print(splitting_rdd.collect())
    
    
    # Function to perform Sentence tokeniaztion
    def sent_TokenizeFunct(x):
        return nltk.sent_tokenize(x)
    
    sentencetokenization_rdd = lower_casetext.map(sent_TokenizeFunct)
    
    # Function to perform Word tokenization
    
    def word_TokenizeFunct(x):
        splitted = [word for line in x for word in line.split()]
        return splitted
    
    wordtokenization_rdd = sentencetokenization_rdd.map(word_TokenizeFunct)
    
    
    # Remove Stop Words
    
    def removeStopWordsFunct(x):
        from nltk.corpus import stopwords
        stop_words=set(stopwords.words('english'))
        filteredSentence = [w for w in x if not w in stop_words]
        return filteredSentence
    
    stopwordRDD = wordtokenization_rdd.map(removeStopWordsFunct)
    
    
    # Remove Punctuation marks
    
    def removePunctuationsFunct(x):
        list_punct=list(string.punctuation)
        filtered = [''.join(c for c in s if c not in list_punct) for s in x] 
        filtered_space = [s for s in filtered if s] #remove empty space 
        return filtered_space
    
    rmvPunctRDD = stopwordRDD.map(removePunctuationsFunct)
    
    # Perform Lemmatization
    
    def lemma(x):
    
        lemmatizer = WordNetLemmatizer()
    
        final_rdd = [lemmatizer.lemmatize(s) for s in x]
        return final_rdd
    
    lem_wordsRDD = rmvPunctRDD.map(lemma)
    
    # Join tokens
    
    # def joinTokensFunct(x):
    #     joinedTokens_list = []
    #     x = " ".join(x)
    #     return x
    
    # joinedTokensRDD = lem_wordsRDD.map(joinTokensFunct)
    
    
    ##Create DataFrame from RDD
    
    df = lem_wordsRDD.map(lambda x: (x, )).toDF(["features"])
    
    tokenized_df = df.withColumn("values", explode("features")).select("values")
    
    
    ## Write DataFrame Output
    
    # tokenized_df.write.mode('append').csv(Destination_path)
    
    ## Change File-name
    
    # for old_file_name in os.listdir(Destination_path):
    #   src = Destination_path+old_file_name
    #   dst = Destination_path+"unigram.csv"
        
    #   if old_file_name.startswith("part-"):
    #       os.rename(src, dst)
            # break
    
    
    ## For Bigrams following commented line is enough
    # # tokenized_df.select(F.concat_ws(" ",F.col("values"),F.lead("values").over(Window.orderBy(F.lit(None))))).show()
    
    
    
    ## Create Final DataFrme 
    
    final_df = tokenized_df.select(collect_list("values").alias("listed_data"))
    
    # final_df.show(truncate=False)
    
    final_df.persist()
    
    
    ## Unigram
    
    unigram = NGram(n=1, inputCol="listed_data", outputCol="unigrams")
    
    unigramDataFrame = unigram.transform(final_df)
    
    unigram_FinalDataFrame = unigramDataFrame.withColumn("unigram_final",explode("unigrams")).select("unigram_final")
    
    
    ## Write DataFrame Outputs
    
    unigram_FinalDataFrame.write.mode('append').parquet(Destination_path)
    
    # Change filename
    for old_file_name in os.listdir(Destination_path):
        src = Destination_path+old_file_name
        dst = Destination_path+"unigram.parquet"
        
        if old_file_name.startswith("part-"):
            os.rename(src, dst)
    
    
    ## Bigram
    
    bigram = NGram(n=2, inputCol="listed_data", outputCol="bigrams")
    
    bigramDataFrame = bigram.transform(final_df)
    
    bigram_FinalDataFrame = bigramDataFrame.withColumn("bigram_final",explode("bigrams")).select("bigram_final")
    
    
    ## Write DataFrame Outputs
    
    bigram_FinalDataFrame.write.mode('append').parquet(Destination_path)
    
    ## Change filename
    for old_file_name in os.listdir(Destination_path):
        src = Destination_path+old_file_name
        dst = Destination_path+"bigram.parquet"
        
        if old_file_name.startswith("part-"):
            os.rename(src, dst)
            # break
    
    ## Trigram 
    
    trigram = NGram(n=3, inputCol="listed_data", outputCol="trigram")
    
    trigramDataFrame = trigram.transform(final_df)
    
    trigram_FinalDataFrame = trigramDataFrame.withColumn("trigram_final",explode("trigram")).select("trigram_final")
    
    ## Write DataFrame Outputs
    trigram_FinalDataFrame.write.mode('append').parquet(Destination_path)
    
    # Change Filename
    for old_file_name in os.listdir(Destination_path):
        src = Destination_path+old_file_name
        dst = Destination_path+"trigram.parquet"
        
        if old_file_name.startswith("part-"):
            os.rename(src, dst)
            # break
    
    final_df.unpersist()
    data.unpersist()