Search code examples
pythonapache-sparkpysparkrdd

Apache Spark Enron DataSet


I am trying to analysis Enron DataSet on apache spark. I want to extract email from and to. First created and rdd using following function:

def utf8_decode_and_filter(rdd):
    def utf_decode(s):
        try:
            return str(s, 'utf-8')
        except:
            pass
    return rdd.map(lambda x: utf_decode(x[1])).filter(lambda x: x != None)

Called the above function with spark sequence

data = utf8_decode_and_filter(sc.sequenceFile('/user/ufac001/project1920/samples'))

When I do:

data.collect()

I can see data as list of string with email between the employees. I am guessing it's a list of strings

Now to extract triples of email. I wrote following function:

def xml_to_emails(s):
    print(s)
    emailed = []

    return s

rdd = data.flatMap(lambda x: xml_to_emails(x)).map(lambda word: (word, 1)).reduceByKey(lambda a,b:a+b)

My issue is First I cannot extract email because on xml_to_email function print(s) outputs nothing when I run collect on rdd I can print a tuple with letter and a number.

How do I extract emails from this rdd?

Please be nice I am newbie on spark


Solution

  • Found a solution like this to and use map inside of flatmap

    import re
    
    def xml_to_emails(s):
        emailed = []
        FromEmail =re.search('From: (.*)\n', s).group(1).replace("\r","")#Can never be null
    
        #check if 
        fromemail=""
        fromDomain = FromEmail.split('@')[1]
        if "enron" in fromDomain:
            fromemail =FromEmail
    
        Date = re.search('Date:(.*)',s).group(1).replace("\r","") #can never be null
    
    
    
        ToEmail= re.search('To:(.*)',s).group(1).replace("\r","")#can never be null
    
        toemail=""
        toDomain = ToEmail.split('@')
        if len(toDomain)>1:
            if "enron" in toDomain[1]:
                toemail =ToEmail
    
    
        CCEmail = re.search('Cc:(.*)',s)#can be null
        if CCEmail is not None:
            CCEmail=CCEmail.group(1).replace("\r","")
    
        BCCEmail = re.search('Bcc:(.*)',s)#can be null
    
        if BCCEmail is not None:
            BCCEmail=BCCEmail.group(1).replace("\r","")
        return (fromemail,(toemail,CCEmail,BCCEmail),Date)
    
    data = rdd.map(lambda x: xml_to_emails(x))#.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a+b)
    

    I was able to extract emails