I am trying to write a MapReduce job using python's MRJob package. The job processes ~36,000 files stored in S3. Each file is ~2MB. When I run the job locally (downloading the S3 bucket to my computer) it takes approximately 1 hour to run. However, when I try to run it on EMR, it takes much longer (I stopped it at 8 hours and it was 10% complete in the mapper). I have attached the code for my mapper_init and mapper below. Does anyone know what would cause an issue like this? Does anyone know how to fix it? I should also note that when I limit the input to a sample of 100 files it works fine.
def mapper_init(self):
"""
Set class variables that will be useful to our mapper:
filename: the path and filename to the current recipe file
previous_line: The line previously parsed. We need this because the
ingredient name is in the line after the tag
"""
#self.filename = os.environ["map_input_file"] # Not currently used
self.previous_line = "None yet"
# Determining if an item is in a list is O(n) while determining if an
# item is in a set is O(1)
self.stopwords = set(stopwords.words('english'))
self.stopwords = set(self.stopwords_list)
def mapper(self, _, line):
"""
Takes a line from an html file and yields ingredient words from it
Given a line of input from an html file, we check to see if it
contains the identifier that it is an ingredient. Due to the
formatting of our html files from allrecipes.com, the ingredient name
is actually found on the following line. Therefore, we save the
current line so that it can be referenced in the next pass of the
function to determine if we are on an ingredient line.
:param line: a line of text from the html file as a str
:yield: a tuple containing each word in the ingredient as well as a
counter for each word. The counter is not currently being used,
but is left in for future development. e.g. "chicken breast" would
yield "chicken" and "breast"
"""
# TODO is there a better way to get the tag?
if re.search(r'span class="ingredient-name" id="lblIngName"',
self.previous_line):
self.previous_line = line
line = self.process_text(line)
line_list = set(line.split())
for word in line_list:
if word not in self.stopwords:
yield (word, 1)
else:
self.previous_line = line
yield ('', 0)
The problem is you have more number of small files. Add bootstrap step using s3distcp to copy files to EMR. while using s3distcp try to aggregate small files into ~128MB file.
Hadoop is not good with large number of small files.
Since you are manually downloading files to your computer and running hence it run faster.
Once you copy file to EMR using S3distCP use file from HDFS.