I am struggling to run the basic word_count.py
pyflink example that comes loaded with the apache flink on AWS EMR
Steps taken:
Successfully created AWS EMR 6.5.0 cluster with the following applications [Flink, Zookeeper] - verified that there is a flink
and flink-yarn-session
binary in $PATH
. AWS says it installed v1.14
Ran the java version successfully by doing the following
sudo flink-yarn-sessions
sudo flink run -m yarn-cluster -yid <application_id> /usr/lib/flink/examples/batch/WordCount.jar
sudo flink run -m yarn-cluster -yid <application_id> -py /usr/lib/flink/examples/python/table/word_count.py
This fails but error makes it obvious that its picking up python2.7
even though python3
is default!!
Fixed the issue by somewhat following this link. Then tried with a simple example to print out sys.version
. This confirmed that its picking up my python version
Try again with venv
sudo flink run -m yarn-cluster -yid <application_id> -pyarch file:///home/hadoop/venv.zip -pyclientexec venv.zip/venv/bin/python3 -py /usr/lib/flink/examples/python/table/word_count.py
At this point, I start seeing various issues ranging from no file found
to mysterious
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql
I ran various permutation of with/without yarn cluster. But no progress made thus far.
I am thinking my issues are either environment related (why isn't AWS taking care of proper python version is beyond me) or my inexperience with yarn/pyflink.
Any pointer would be greatly appreciated.
This is what you do. To make a cluster:
aws emr create-cluster --release-label emr-6.5.0 --applications Name=Flink --configurations file://./config.json --region us-west-2 --log-uri s3://SOMEBUCKET --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole --ec2-attributes KeyName=YOURKEYNAME,InstanceProfile=EMR_EC2_DefaultRole --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,Args=flink-yarn-session,-d
Contents of config.json:
"Classification": "flink-conf",
"Properties": {
"python.executable": "python3",
"python.client.executable": "python3"
"Configurations": [
Then once you are in, try this
sudo flink run -m yarn-cluster -yid YID -py /usr/lib/flink/examples/python/table/batch/word_count.py
You can find the YID in the AWS EMR console under application user interfaces.