I want to create a new EMR cluster, run a PySpark job and destroy it. Ideally I'd like to do this by adding a step when creating the cluster. The command I would run locally to start the jobs looks like this:
spark-submit calculate.py --input x.csv --output output
What I don't understand is how I can make sure that calculate.py is already available on the master node. I saw a reference of reading the python script from an S3 bucket here, but I couldn't get that to work.
Now I have separate commands for creating the cluster, putting the script on the master node and adding the steps. the problem with this is that the cluster keeps running after the job step finishes.
I solved this by creating an extra step which just calls hadoop fs -copyToLocal
to download the files.
I had the problem with the bootstrap step that the hadoop command was not installed yet.
Full working example using boto3:
import boto3
client = boto3.client('emr', region_name='eu-central-1')
cluster_id = client.run_job_flow(
Name='My job',
LogUri='s3://my-bucket/emr-logs/',
ReleaseLabel='emr-4.0.0',
Instances={
'MasterInstanceType': 'm3.xlarge',
'SlaveInstanceType': 'm3.xlarge',
'InstanceCount': 2,
'Ec2KeyName': 'my_key',
'Ec2SubnetId': 'subnet-123456'
},
Applications=[
{
'Name': 'Spark'
}
],
Steps=[
{
'Name': 'Copy files to master',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'hadoop',
'fs',
'-copyToLocal',
'%s/code/*' % S3_BUCKET,
'/home/hadoop/'
]
}
},
{
'Name': 'Calculate step',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'/home/hadoop/calculate.py',
'--param',
'value'
]
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole')
print cluster_id