I have following code to run a EMR job, and it runs successfully. And I also want to monitor the running status. I use DescribeJobFlows
API, but it says this API is deprecated according to http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/AmazonElasticMapReduceClient.html.
Could anybody help about what is the best practice to monitor EMR running progress?
public class EmrJobRunner {
public static void main(String[] args) {
// args is [input_file_path, output_directory], make sure output_directory does not exist
String inputFilePath = "s3://mybucket/emr/input";
String outputDirectory = "s3://mybucket/emr/output/" + System.currentTimeMillis();
String jarName = "WordCount.jar";
String jarPath = "s3://mybucket/emr/" + jarName;
String logPath = "s3://mybucket/emr/logs";
AWSCredentials credentials = new BasicAWSCredentials("pub_key", "sec_key");
StepFactory stepFactory = new StepFactory();
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
StepConfig enableDebugging = new StepConfig()
.withName("Enable debugging")
StepConfig installHive = new StepConfig()
.withName("Install Hive")
StepConfig runScript = new StepConfig()
.withName("Run Script")
List<String> jarArgs = Arrays.asList(inputFilePath, outputDirectory);
HadoopJarStepConfig jarCfg= new HadoopJarStepConfig()
StepConfig runJar = new StepConfig()
JobFlowInstancesConfig instanceCfg = new JobFlowInstancesConfig()
List<StepConfig> steps = Arrays.asList(enableDebugging, installHive, runScript, runJar);
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("My EMR Job Flow")
RunJobFlowResult result = emr.runJobFlow(request);
// saying DescribeJobFlows is deprecated
// DescribeJobFlowsResult jobFlowDescResult = emr.DescribeJobFlows(DescribeJobFlowsRequest describeJobFlowsRequest);
Since DescribeJobFlows
is deprecated, monitor cluster status is an alternate way to monitor job run progress.
RunJobFlowResult runJobResult = emr.runJobFlow(runJobFlowRequest);
System.out.printf("Run JobFlowId is: %s\n", runJobResult.getJobFlowId());
while(true) {
DescribeClusterRequest desc = new DescribeClusterRequest()
DescribeClusterResult clusterResult = emr.describeCluster(desc);
Cluster cluster = clusterResult.getCluster();
String status = cluster.getStatus().getState();
System.out.printf("Status: %s\n", status);
if(status.equals(ClusterState.TERMINATED.toString()) || status.equals(ClusterState.TERMINATED_WITH_ERRORS.toString())) {
try {
} catch (InterruptedException e) {
// maybe other handle