Search code examples
botoemr

Creating EMR using Boto fails


I am trying to create emr cluster from python using the boto library, I tried a few things but the end result is "Shut down as step failed" I tried running an example code that amazon supplies about wordcount and it still fails.

when i checked the logs i found that the emr could not find were the mapper is located.

s3n://elasticmapreduce/samples/wordcount/wordSplitter.py": error=2, No such file or directory

that led me to this response from Amazon that i found in some site:

Hello,

Beginning with AMI 3.x with Hadoop 2 and moving forward EMR Hadoop streaming will support the standard Hadoop style reference for streaming jobs.

This means that s3 referenced mappers and reducers will need to be placed into > the "-files" argument.

For example,

elastic-mapreduce --create --ami-version 3.0.1 --instance-type m1.large --log-> uri s3n://mybucket/logs --stream --mapper s3://elasticmapreduce/samples/wordcount/wordSplitter.py --input s3://mybucket/input/alice.txt --output s3://mybucket/output --reducer aggregate

becomes:

elastic-mapreduce --create --ami-version 3.0.1 --instance-type m1.large --log-> uri s3n://mybucket/logs --stream --arg "-files" --arg "s3://elasticmapreduce/samples/wordcount/wordSplitter.py" --mapper wordSplitter.py --input s3://mybucket/input/alice.txt --output s3://mybucket/output --reducer aggregate

Now i am trying to see if that the solution for me, but i cant understand how to set the --files flag and the argument as he mentioned

this is the current code i have:

self._steps.append(StreamingStep(
    name=step_description,
    mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
    reducer='aggregate',
    input='s3n://elasticmapreduce/samples/wordcount/input',
    output='s3n://'+'test'))

conn.run_jobflow(
    availability_zone='us-east-1b',
    name=job_description,
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.xlarge',
    num_instances=3,
    action_on_failure='TERMINATE_JOB_FLOW',
    keep_alive=True,
    log_uri='s3://'+"logs",
    ami_version="3.6.0",
    steps=self._steps,
    bootstrap_actions=self._actions,
    visible_to_all_users=True
)

--------------edit---------------
It looks like this is the answer, i lowered the ami_version to 2.4.11 which is the last version for Hadoop 2 and the same code works now. I don't really know if i really need the latest Hadoop release, probably not, but it bugs me that i am not using the latest release available by Amazon.

--------------edit2---------------
Found the solution,

//create a list and insert two elements
//the first element is the argument name '-files'
//the second is the full path to both the mapper and the reducer seperated by comma
//if you try to put it in a single line it fails...
step_args = list()
step_args.append('-files')
step_args.append('s3://<map_full_path>/<map_script_name>,s3://<reduce_full_path>/<reduce_script_name>')

// add step_args to the StreamingStep argument
self._steps.append(StreamingStep(
    name=step_description,
    mapper='<map_script_name>',
    reducer='<reduce_script_name>',
    input='s3n://elasticmapreduce/samples/wordcount/input',
    output='s3n://'+'test',
    step_args=step_args)

conn.run_jobflow(
    availability_zone='us-east-1b',
    name=job_description,
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.xlarge',
    num_instances=3,
    action_on_failure='TERMINATE_JOB_FLOW',
    keep_alive=True,
    log_uri='s3://'+"logs",
    ami_version="3.6.0",
    steps=self._steps,
    bootstrap_actions=self._actions,
    visible_to_all_users=True
)

Hope it help somebody...


Solution

  • Found the solution,

    //create a list and insert two elements
    //the first element is the argument name '-files'
    //the second is the full path to both the mapper and the reducer seperated by comma
    //if you try to put it in a single line it fails...
    step_args = list()
    step_args.append('-files')
    step_args.append('s3://<map_full_path>/<map_script_name>,s3://<reduce_full_path>/<reduce_script_name>')
    
    // add step_args to the StreamingStep argument
    self._steps.append(StreamingStep(
        name=step_description,
        mapper='<map_script_name>',
        reducer='<reduce_script_name>',
        input='s3n://elasticmapreduce/samples/wordcount/input',
        output='s3n://'+'test',
        step_args=step_args)
    
    conn.run_jobflow(
        availability_zone='us-east-1b',
        name=job_description,
        master_instance_type='m3.xlarge',
        slave_instance_type='m3.xlarge',
        num_instances=3,
        action_on_failure='TERMINATE_JOB_FLOW',
        keep_alive=True,
        log_uri='s3://'+"logs",
        ami_version="3.6.0",
        steps=self._steps,
        bootstrap_actions=self._actions,
        visible_to_all_users=True
    )
    

    Hope it help somebody...