We have developed in Flink a system that reads files from a directory, group them by client and depending on the type of information, those got pushed to a sink. We did this with a local installation of Flink in our machines and it was working without issues. However, when we dockerized the project, our job is correctly submitted and UI shows it as running, but the job is actually never started. In the UI is showed like this when we go to the details: Flink dashboard screenshot
Logs in the job are showing the following:
2021-10-15 12:13:50,247 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job Decibel Processor (d7ea5533f968c1d27e06f45c94e5823a).
2021-10-15 12:13:50,247 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms.
2021-10-15 12:13:50,269 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 1 ms
2021-10-15 12:13:50,279 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3ae62e37
2021-10-15 12:13:50,280 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'jobmanager'
2021-10-15 12:13:50,296 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore.
2021-10-15 12:13:50,304 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3370d640 for Decibel Processor (d7ea5533f968c1d27e06f45c94e5823a).
2021-10-15 12:13:50,319 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job Decibel Processor (d7ea5533f968c1d27e06f45c94e5823a) under job master id 00000000000000000000000000000000.
2021-10-15 12:13:50,322 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-10-15 12:13:50,323 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Decibel Processor (d7ea5533f968c1d27e06f45c94e5823a) switched from state CREATED to RUNNING.
2021-10-15 12:13:50,333 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Property Config Stream (1/1) (1577d0fa1357c72c49057d4cdd7e2ddd) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,333 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom File Source (1/1) (73a31f7f635a874d2f8df49e7184b83d) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,333 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Collection File Input -> Map -> Collection Messages -> Filter Collection Messages -> WaterMark Filter Message (1/1) (55cf6f5740276160f44cc651c0971af8) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,333 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Session Window -> Filter -> Map -> Sink: Unnamed (1/1) (0d5400e22e1a0b8491f0a98ffcf3ac12) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,333 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Keyed Reduce -> Stats output -> Sink: Unnamed (1/1) (f2fefa969c0f63f43c817e4c981b6edf) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,334 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Storage side output (1/1) (b3204e02a57a8bc0b04a63f044050fd6) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,334 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: In progress sessions side output (1/1) (9def69110da478c06dfc1636c76349aa) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,334 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: In progress page views side output (1/1) (cec4378c07fe32c56c9db45ae144079b) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,361 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Connecting to ResourceManager akka.tcp://flink@jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-10-15 12:13:50,366 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved ResourceManager address, beginning registration
2021-10-15 12:13:50,369 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager [email protected]://flink@jobmanager:6123/user/rpc/jobmanager_2 for job d7ea5533f968c1d27e06f45c94e5823a.
2021-10-15 12:13:50,375 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager [email protected]://flink@jobmanager:6123/user/rpc/jobmanager_2 for job d7ea5533f968c1d27e06f45c94e5823a.
2021-10-15 12:13:50,378 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2021-10-15 12:13:50,381 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job d7ea5533f968c1d27e06f45c94e5823a: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
But if we add a file to the folder that should be processed nothing happens.
We use docker-compose for this since this has to be deployed with other parts of code that are not related to Flink (I'm omitting the parts that are not relevant)
short-term-processor:
# Define the name and version of the docker image
image: short-term-processor
container_name: short_term_processor
command: standalone-job --job-classname com.decibel.flink.processor.Main
restart: always
ports:
- "8081:8081"
volumes:
- /flink/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml
- /flink/processor.properties:/opt/flink/conf/processor.properties
- /flink/data:/data1
Config yaml and properties are exactly the same as the ones that works without issues in our local clusters. Any idea why this may be happening?
After several tries, we found the problem and the solution: Standalone docker job just submit the job but never gets started.
In order to solve this, we need to create 2 extra containers, one for job manager and one for task manager:
service:
# Define the name and version of the docker image
image: service
container_name: service
command: "flink run /opt/flink/usrlib/artifacts/service.jar --port 9000"
# Logs are pushed to syslog
logging:
driver: syslog
restart: always
ports:
- "9000:9000"
volumes:
- /etc/localtime:/etc/localtime:ro
- /var/log/decibel/:/opt/flink/log/
- /conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml
- /conf/processor.properties:/opt/flink/conf/processor.properties
- /data1:/data1/decibelinsightdata
depends_on:
- jobmanager
- taskmanager
jobmanager:
image: apache/flink:1.13.0-scala_2.11-java11
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
volumes:
- /data1:/data1/decibelinsightdata
- /conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: apache/flink:1.13.0-scala_2.11-java11
command: "taskmanager.sh start-foreground"
volumes:
- /data1:/data1/decibelinsightdata
- /conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
After this, jobs starts without any issue