Search code examples
apache-flinkapache-beam

What environment_config for Beam launching flink


I am hoping for guidance on how to set --environment_config when running the Beam wordcount.py demo.

It runs fine with DirectRunner. Flink's wordcount also runs fine (ie running Flink via flink run).

I would like to run Beam using the Flink runner using a "seperate Flink cluster" as described in the beam documentation. I can't use Docker, so I plan to use --environment_type=PROCESS.

I am using the following inside the python code to set environment_config:

  environment_config = dict()
  environment_config['os'] = platform.system().lower()
  environment_config['arch'] = platform.machine()
  environment_config['command'] = 'ls'
  ec = "--environment_config={}".format(json.dumps(environment_config))

Obviously the command is incorrect. When I run this, Flink does receive and successfully process the DataSource sub-tasks. It eventually time-outs on the CHAIN MapPartitions.

Could someone provide guidance (or links) as to how to set environment_config? I am running Beam within a Singularity container.


Solution

  • For environment_type=DOCKER, most everything's taken care of for you, but in process mode you have to do a lot of setup yourself. The command you're looking for is sdks/python/container/build/target/launcher/linux_amd64/boot. You will be required to have both that executable (which you can build from source using ./gradlew :sdks:python:container:build) and a Python installation including Beam and other dependencies on all of your worker machines.

    The best example I know of is here: https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165