Search code examples
apache-sparkpysparkapache-beamamazon-emrspark-submit

apache-beam installation issue on AWS EMR-EC2 cluster


I started an AWS EMR-EC2 cluster, I am having trouble getting the sparkrunner of apache-beam to work. I have a python script that will use apache-beam. I have tried either aws emr add-steps or ssh into primary node and spark-submit both times get

package not found error on apache-beam
  1. I tried using the bootstrapping steps to install apache-beam
#!/bin/bash
pip install --upgrade pip
pip install setuptools --upgrade
pip install "apache_beam[aws]==2.61.0"

when i add steps or submit jobs, it is still said package no found.

  1. I tried manually install apache-beam in all 3 nodes under root, but when I run the spark submit, it still said no package found.

  2. I tried using docker image

  spark-submit     --master yarn      \
   --deploy-mode cluster     \
  --conf spark.kubernetes.container.image=user/image:latest \
  --conf spark.kubernetes.container.image.pullPolicy=Always \
  --conf spark.pyspark.python=python3 \
  --conf spark.pyspark.driver.python=python3 \
  --conf spark.executorEnv.PYTHONPATH="/usr/local/lib/python3.7/site-packages" \
   beam_test_local.py     \
    --input_pattern_list="s3path" \
    --runner=SparkRunner \
    --spark_master=yarn \
    --region=us-east-1

I checked and grep all the log for docker or my username, but there is no sign of docker being pull at all.

  1. The closest I ever get it to work is to use a python venv. but the thing is not very portable, keep on having issues.
aws emr add-steps \
    --cluster-id j-id \
    --steps Type=Spark,Name="SparkJob",ActionOnFailure=CONTINUE,Args="[
    '--master', 'yarn',
    '--deploy-mode', 'cluster',
    '--conf', 'spark.archives=s3://s3bucket/pyspark_venv.tar.gz#environment', 
    '--conf', 'spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python',
    '--conf', 'spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python',
    'pythonfile'
    ]" \
    --region us-east-1 

this one will get over the package not found error, but it will complain about

WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1733453351.558696    7840 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/beam_test.py", line 55, in <module>
    run()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/beam_test.py", line 49, in run
    (p
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/pipeline.py", line 620, in __exit__
    self.result = self.run()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/pipeline.py", line 594, in run
    return self.runner.run_pipeline(self, self._options)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/runner.py", line 180, in run_pipeline
    return self.run_portable_pipeline(
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 375, in run_portable_pipeline
    job_service_handle = self.create_job_service(options)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 289, in create_job_service
    return self.create_job_service_handle(server.start(), options)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/job_server.py", line 79, in start
    self._endpoint = self._job_server.start()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/job_server.py", line 104, in start
    cmd, endpoint = self.subprocess_cmd_and_endpoint()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/job_server.py", line 149, in subprocess_cmd_and_endpoint
    jar_path = self.local_jar(self.path_to_jar(), self._jar_cache_dir)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/job_server.py", line 146, in local_jar
    return subprocess_server.JavaJarServer.local_jar(url, jar_cache_dir)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/utils/subprocess_server.py", line 382, in local_jar
    os.makedirs(cache_dir)
  File "/usr/lib64/python3.9/os.py", line 215, in makedirs
    makedirs(head, exist_ok=exist_ok)
  File "/usr/lib64/python3.9/os.py", line 215, in makedirs
    makedirs(head, exist_ok=exist_ok)
  File "/usr/lib64/python3.9/os.py", line 225, in makedirs
    mkdir(name, mode)
PermissionError: [Errno 13] Permission denied: '/home/.apache_beam'

I am very lost right now. would appreciate any help


Solution

  • When I use spark-submit, i do not need to set the runner to SparkRunner, because spark-submit is a command-line tool specifically designed to submit Spark applications. By using spark-submit, Spark automatically handles the execution of the job on the cluster, and I don't need to specify a runner in your code.