I am trying to analysis Enron DataSet on apache spark. I want to extract email from and to. First created and rdd using following function:
def utf8_decode_and_filter(rdd):
def utf_decode(s):
try:
return str(s, 'utf-8')
except:
pass
return rdd.map(lambda x: utf_decode(x[1])).filter(lambda x: x != None)
Called the above function with spark sequence
data = utf8_decode_and_filter(sc.sequenceFile('/user/ufac001/project1920/samples'))
When I do:
data.collect()
I can see data as list of string with email between the employees. I am guessing it's a list of strings
Now to extract triples of email. I wrote following function:
def xml_to_emails(s):
print(s)
emailed = []
return s
rdd = data.flatMap(lambda x: xml_to_emails(x)).map(lambda word: (word, 1)).reduceByKey(lambda a,b:a+b)
My issue is First I cannot extract email because on xml_to_email function print(s) outputs nothing when I run collect on rdd I can print a tuple with letter and a number.
How do I extract emails from this rdd?
Please be nice I am newbie on spark
Found a solution like this to and use map inside of flatmap
import re
def xml_to_emails(s):
emailed = []
FromEmail =re.search('From: (.*)\n', s).group(1).replace("\r","")#Can never be null
#check if
fromemail=""
fromDomain = FromEmail.split('@')[1]
if "enron" in fromDomain:
fromemail =FromEmail
Date = re.search('Date:(.*)',s).group(1).replace("\r","") #can never be null
ToEmail= re.search('To:(.*)',s).group(1).replace("\r","")#can never be null
toemail=""
toDomain = ToEmail.split('@')
if len(toDomain)>1:
if "enron" in toDomain[1]:
toemail =ToEmail
CCEmail = re.search('Cc:(.*)',s)#can be null
if CCEmail is not None:
CCEmail=CCEmail.group(1).replace("\r","")
BCCEmail = re.search('Bcc:(.*)',s)#can be null
if BCCEmail is not None:
BCCEmail=BCCEmail.group(1).replace("\r","")
return (fromemail,(toemail,CCEmail,BCCEmail),Date)
data = rdd.map(lambda x: xml_to_emails(x))#.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a+b)
I was able to extract emails