Search code examples
amazon-s3airflowamazon-emr

Can't get Apache Airflow to write to S3 using EMR Operators


I am using the Airflow EMR Operators to create an AWS EMR Cluster that runs a Jar file contained in S3 and then writes the output back to S3. It seems to be able to run the job using the Jar file from S3, but I cannot get it to write the output to S3. I am able to get it to write the output to S3 when running it as an AWS EMR CLI Bash command, but I need to do it using the Airflow EMR Operators. I have the S3 output directory set both in the Airflow step config and in the environment config in the Jar file and still cannot get the Operators to write to it.

Here is the code I have for my Airflow DAG

from datetime import datetime, timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator

DEFAULT_ARGS = {
    'owner': 'AIRFLOW_USER',
    'depends_on_past': False,
    'start_date':  datetime(2019, 9, 9),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False
}

RUN_STEPS = [
    {
        "Name": "run-custom-create-emr",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit", "--deploy-mode", "cluster", "--master", "yarn", "--conf",
                "spark.yarn.submit.waitAppCompletion=false", "--class", "CLASSPATH",
                "s3://INPUT_JAR_FILE",
                "s3://OUTPUT_DIR"
            ]
        }
    }
]

JOB_FLOW_OVERRIDES = {
    "Name": "JOB_NAME",
    "LogUri": "s3://LOG_DIR/",
    "ReleaseLabel": "emr-5.23.0",
    "Instances": {
        "Ec2KeyName": "KP_USER_NAME",
        "Ec2SubnetId": "SUBNET",
        "EmrManagedMasterSecurityGroup": "SG-ID",
        "EmrManagedSlaveSecurityGroup": "SG-ID",
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m4.large",
                "InstanceCount": 1
            },
            {
                "Name": "Slave nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "InstanceType": "m4.large",
                "InstanceCount": 1
            }
        ],
        "TerminationProtected": True,
        "KeepJobFlowAliveWhenNoSteps": True,
    },
    "Applications": [
        {
            "Name": "Spark"
        },
        {
            "Name": "Ganglia"
        },
        {
            "Name": "Hadoop"
        },
        {
            "Name": "Hive"
        }
    ],
    "JobFlowRole": "ROLE_NAME",
    "ServiceRole": "ROLE_NAME",
    "ScaleDownBehavior": "TERMINATE_AT_TASK_COMPLETION",
    "EbsRootVolumeSize": 10,
    "Tags": [
        {
            "Key": "Country",
            "Value": "us"
        },
        {
            "Key": "Environment",
            "Value": "dev"
        }
    ]
}

dag = DAG(
    'AWS-EMR-JOB',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval=None
)

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_connection_CustomCreate',
    dag=dag
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=RUN_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

Does anyone have any ideas how I can solve this problem? Any help would be appreciated.


Solution

  • I believe that I just solved my problem. After really digging deep into all the local Airflow logs and the S3 EMR logs I found a Hadoop Memory Exception, so I increased the number of cores to run the EMR on and it seems to work now.