Search code examples
hadoop-yarnapache-flinkamazon-emr

Error while deploying Flink custom JAR file in AWS EMR


Basically I want to deploy a Flink custom JAR file to a new AWS EMR cluster. Here is a summary of what I did. I created a new AWS EMR cluster.

  • Step1:Software and steps changes -

    • Created a AWS EMR cluster with flink as the service. (EMR release version - 5.17.0) and clicked Flink 1.5.2 as the software configuration.
    • Entered the Configuration JSON:-

      [ { "Classification": "flink-conf", "Properties": { "jobmanager.heap.mb": "3072", "taskmanager.heap.mb": "51200", "taskmanager.numberOfTaskSlots":"2", "taskmanager.memory.preallocate": "false",
      "parallelism.default": "1" } ]

  • Step2:Hardware - No change in the hardware configuration.By default we have 1 master, 2 core and 0 Task instances. All are m3.xlarge type.

  • Step3:General Cluster Settings - No change here.
  • Step4:Security - Provided my EC2 key pair.

Once the cluster creation is ready I SSHed to the EC2 machine and tried to deploy the custom jar file. Below are the different errors I got everytime tried to deploy it via the CLI.

1)

flink run -m yarn-cluster -yn 2 -c com.deepak.flink.examples.WordCount flink-examples-assembly-1.0.jar

Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2018-10-09 06:30:36,766 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at ip-IPADDRESS.ec2.internal/IPADDRESS:8032
2018-10-09 06:30:36,909 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2018-10-09 06:30:37,168 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Killing YARN application

2)

flink run -c com.deepak.flink.examples.WordCount flink-examples-assembly-1.0.jar

Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve standalone cluster
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:51)
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:31)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.util.ConfigurationException: Config parameter 'Key: 'jobmanager.rpc.address' , default: null (deprecated keys: [])' is missing (hostname/address of JobManager to connect to).
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getJobManagerAddress(HighAvailabilityServicesUtils.java:141)
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:81)
    at org.apache.flink.client.program.ClusterClient.<init>(ClusterClient.java:158)
    at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:183)
    at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:156)
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
    ... 10 more

Even I tried to deploy via the AWS Web UI, there also the jar failed to deploy.

So, Basically I want to deploy the custom JAR to the flink YARN Cluster. I am not sure what I am missing for the YARN flink configuration or anything else. Thanks for any help in advance.


Solution

  • You should reduce memory allocation for task manager. Currently, you are trying to allocate 51.2G of memory whereas single m3.xlarge machine has only 15G of memory and in total 30G for 2 machines cluster.