When I am trying to run a TFX pipeline/Apache Beam job on a Flink runner, it works fine when using 1 task manager (on one node) with parallelism 2 (2 task slots per task manager). But hangs when I try it with higher parallelism on more than one task manager with the message constantly repeating on both task managers:
INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory [] - Still waiting for startup of environment from a65a0c5f8f962428897aac40763e57b0-1334930809.eu-central-1.elb.amazonaws.com:50000 for worker id 1-1
The Flink cluster runs on a native Kubernetes deployment on an AWS EKS Kubernetes Cluster.
I use the following parameters:
EDIT: Adding additional info about the configuratio
I have configured the Beam workers to run as side-cars (at least this is my understanding of how it should work), by setting the Flink parameter:
it is pointing out to a template file with contents:
kind: Pod
name: taskmanager-pod-template
#hostNetwork: true
- name: flink-main-container
#image: apache/flink:scala_2.12
- name: AWS_REGION
value: "eu-central-1"
- name: S3_VERIFY_SSL
value: "0"
value: "/data/flink/src"
args: ["taskmanager"]
- containerPort: 6122 #22
name: rpc
- containerPort: 6125
name: query-state
port: 6122 #22
initialDelaySeconds: 30
periodSeconds: 60
- name: beam-worker-pool
value: "/data/flink/src"
- name: AWS_REGION
value: "eu-central-1"
- name: S3_VERIFY_SSL
value: "0"
image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
imagePullPolicy: Always
args: ["--worker_pool"]
- containerPort: 50000
name: pool
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
I have also created a kubernetes load balancer for the task managers, so clients can connect on port 50000. So I use that address when configuring:
EDIT 2: Looks like the Beam SDK harness on one task manager wants to connect to the endpoint running on the other task manager, but looks for it on localhost:
Log from beam-worker-pool on TM 2:
2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial server at localhost:33705
caused by:
context deadline exceeded
The provision endpoint on TM 1 is the one actually listening on the port 33705, while this is looking for it on localhost, so cannot connect to it.
EDIT 3: Showing how I test this:
TM 1:
$ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
2021/08/12 09:10:34 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793']
2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial server at localhost:40983
caused by:
context deadline exceeded
TM 2:
$ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
2021/08/12 09:10:33 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083']
2021/08/12 09:13:09 Failed to obtain provisioning information: failed to dial server at localhost:32907
caused by:
context deadline exceeded
TM 1:
$ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection refused
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
Warning: Binary output can mess up your terminal. Use "--output -" to ...
TM 2:
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection refused
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
Warning: Binary output can mess up your terminal. Use "--output -" to tell
Warning: curl to output it to your terminal anyway, or consider "--output
Not sure how to fix this.
Thanks, Gorjan
I was able to fix this by setting the Beam SDK address to localhost instead of using a load balancer. So the config I use now is:
"--environment_config=localhost:50000", # <--- Changed the address to localhost