Search code examples
apache-flinkpyflink

How do you run pyflink scripts on AWS EMR?


I am struggling to run the basic word_count.py pyflink example that comes loaded with the apache flink on AWS EMR

Steps taken:

  1. Successfully created AWS EMR 6.5.0 cluster with the following applications [Flink, Zookeeper] - verified that there is a flink and flink-yarn-session binary in $PATH. AWS says it installed v1.14.

  2. Ran the java version successfully by doing the following

  • sudo flink-yarn-sessions
  • sudo flink run -m yarn-cluster -yid <application_id> /usr/lib/flink/examples/batch/WordCount.jar
  1. Tried running the same with the python but no dice
  • sudo flink run -m yarn-cluster -yid <application_id> -py /usr/lib/flink/examples/python/table/word_count.py

This fails but error makes it obvious that its picking up python2.7 even though python3 is default!!

  1. Fixed the issue by somewhat following this link. Then tried with a simple example to print out sys.version. This confirmed that its picking up my python version

  2. Try again with venv

  • sudo flink run -m yarn-cluster -yid <application_id> -pyarch file:///home/hadoop/venv.zip -pyclientexec venv.zip/venv/bin/python3 -py /usr/lib/flink/examples/python/table/word_count.py

At this point, I start seeing various issues ranging from no file found to mysterious

pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql

I ran various permutation of with/without yarn cluster. But no progress made thus far.

I am thinking my issues are either environment related (why isn't AWS taking care of proper python version is beyond me) or my inexperience with yarn/pyflink.

Any pointer would be greatly appreciated.


Solution

  • This is what you do. To make a cluster:

    aws emr create-cluster --release-label emr-6.5.0 --applications Name=Flink --configurations file://./config.json --region us-west-2 --log-uri s3://SOMEBUCKET --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole --ec2-attributes KeyName=YOURKEYNAME,InstanceProfile=EMR_EC2_DefaultRole --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,Args=flink-yarn-session,-d
    

    Contents of config.json:

    [
      {
        "Classification": "flink-conf",
        "Properties": {
          "python.executable": "python3",
          "python.client.executable": "python3"
        },
        "Configurations": [
          
        ]
      }
    ]
    

    Then once you are in, try this

    sudo flink run -m yarn-cluster -yid YID -py /usr/lib/flink/examples/python/table/batch/word_count.py
    

    You can find the YID in the AWS EMR console under application user interfaces.