Search code examples
javahadoopclasspathhadoop-yarnamazon-emr

EMR Yarn application submission via REST


I have an Hadoop cluster in AWS with YARN, to which I submit spark applications. I work via REST requests, submitting XML as specified in this documentation: YARN REST API. It works great for the regular cluster.

I'm currently doing a POC for working with EMR cluster instead of the usual one, where I use the existing REST commands and simply communicate with the internal YARN of the EMR via SSH, as specified here: Web access of internal EMR services. It works great for most of the REST commands, such as POST http://<rm http address:port>/ws/v1/cluster/apps/new-application, but when I submit a new application it fails immediately and reports that it cannot find the ApplicationMaster.

Log Type: stderr

Log Upload Time: Sun Feb 03 17:18:35 +0000 2019

Log Length: 88

Error: Could not find or load main class org.apache.spark.deploy.yarn.ApplicationMaster

I suspected this is connected to the classpath somehow, and when I added to the REST submit application node the classpath flag with the EMR FS location of all the jars (/usr/lib/spark/jars/*), it finds the ApplicationMaster but then fails to find Jars in the core instances, with this strange error log:

Log Type: stderr

Log Upload Time: Thu Jan 31 15:11:21 +0000 2019

Log Length: 89

Error: Could not find or load main class .usr.lib.spark.jars.datanucleus-core-3.2.10.jar

The most unusual thing is the way it tries to describe the jar it cannot find, rather than the class. After further investigation, I found the reason: when the Java command is sent to the Core instance, it resolves the classpath to its three files: java -server -classpath /usr/lib/spark/jars/datanucleus-api-jdo-3.2.6.jar /usr/lib/spark/jars/datanucleus-core-3.2.10.jar /usr/lib/spark/jars/datanucleus-rdbms-3.2.9.jar ... and therefore tries to execute "/usr/lib/spark/jars/datanucleus-core-3.2.10.jar" as if it was a runnable. The problem is that if I try to change the classpath to be more specific, or if I try to remove it, the application fails again for not finding the ApplicationMaster.

The REST request I send to YARN is:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?> 
<application-submission-context> 
<application-id>application_1549270910165_0001</application-id> 
<application-name> .... some name .....</application-name> 
<queue>default</queue> 
<priority>0</priority> 
<am-container-spec> 
<local-resources> 
    <entry>

 ....... MANY MANY RESOURCES ......

        </value>
    </entry>

</local-resources> 
<environment> 
<entry> 
<key>SPARK_YARN_STAGING_DIR</key> 
<value>..... staging directory in our HDFS ..... </value> 
</entry> 
<entry> 
<key>CLASSPATH</key> 
<value>$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:/usr/lib/spark/jars/*:/usr/lib/spark/yarn/lib/*:%HADOOP_CONF_DIR%:%HAOOP_COMMON_HOME%/share/hadoop/common/*:%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__</value> 
</entry>
<entry>
<key>SPARK_USER</key>
<value>... user name ....</value>
</entry>
</environment>
<commands> 
<command>command=$JAVA_HOME/bin/java -classpath '/usr/lib/spark/jars/*' -server -Xmx5120M -Djava.io.tmpdir=$PWD/tmp '-XX:hashCode=0' '-Dlog4j.configuration=log4j-client.cfg' '-Dhdp.version=2.8.4' -Dspark.yarn.app.container.log.dir=&lt;LOG_DIR&gt; org.apache.spark.deploy.yarn.ApplicationMaster ... some jar and arguments ....  --properties-file $PWD/__spark_conf__/__spark_conf__.properties 1&gt; &lt;LOG_DIR&gt;/stdout 2&gt; &lt;LOG_DIR&gt;/stderr</command> 
</commands> 
</am-container-spec> 
<unmanaged-AM>false</unmanaged-AM> 
<max-app-attempts>1</max-app-attempts> 
<resource> 
<memory>5632</memory> 
<vCores>1</vCores> 
</resource> 
<application-type>SPARK</application-type> 
<keep-containers-across-application-attempts>false</keep-containers-across-application-attempts> 
<application-tags> 
<tag>.... product tag .....</tag> 
</application-tags> 
<log-aggregation-context/> 
<attempt-failures-validity-interval>1</attempt-failures-validity-interval> 
<reservation-id/> 
</application-submission-context>

I'd appreciate any leads.


Solution

  • After a long search, I found that the reason the application could not load the class org.apache.spark.deploy.yarn.ApplicationMaster is because this isn't the version of ApplicationMaster the EMR core instance uses - it uses org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster, which requires the CLASSPATH segment in the input to include /usr/lib/hadoop-yarn/*. I changed the two parameters in the input XML of the REST request and it succeeded to launch. I'll still need to configure the correct CLASSPATH for the EMR implementation to get the application to complete successfully, but the main challenge of this question is solved.

    Update: eventually I decided that adding a step to the EMR and using the arguments there is actually a much easier way to handle it. I added to the maven dependencies the EMR AWS Java SDK:

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-emr</artifactId>
        <version>1.11.486</version>
    </dependency>
    

    and added this code:

    AddJobFlowStepsResult result = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
                .withJobFlowId(clusterId)
                .withSteps(new StepConfig()
                        .withName(name)
                        .withActionOnFailure(ActionOnFailure.CONTINUE)
                        .withHadoopJarStep(new HadoopJarStepConfig()
                                .withJar("command-runner.jar")
                                .withArgs(stepargs))));
    

    The stepargs are taken from my original REST request, including the jars and the files to use - simply using spark-submit:

    List<String> stepargs = new ArrayList<String>();
    stepargs.add("spark-submit");
    stepargs.add("--class");
    stepargs.add(mainClass);
    stepargs.add("--deploy-mode");
    stepargs.add("cluster");
    stepargs.add("--master");
    stepargs.add("yarn");
    stepargs.add("--files");
    stepargs.add(files);
    stepargs.add("--jars");
    stepargs.add(jars);
    stepargs.add("--properties-file");
    stepargs.add(confFileName);
    stepargs.add(jar);
    Iterator<String> itr = args.iterator();
    while (itr.hasNext()) {
        String arg = itr.next();
        if (arg.equals("--arg")) {
            stepargs.add(itr.next());
        }
    }