Search code examples
javadockerapache-flink

Problem when starting tasks in docker standalone flink


We have developed in Flink a system that reads files from a directory, group them by client and depending on the type of information, those got pushed to a sink. We did this with a local installation of Flink in our machines and it was working without issues. However, when we dockerized the project, our job is correctly submitted and UI shows it as running, but the job is actually never started. In the UI is showed like this when we go to the details: Flink dashboard screenshot

Logs in the job are showing the following:

2021-10-15 12:13:50,247 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job Decibel Processor (d7ea5533f968c1d27e06f45c94e5823a).
2021-10-15 12:13:50,247 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
2021-10-15 12:13:50,269 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 1 ms
2021-10-15 12:13:50,279 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3ae62e37
2021-10-15 12:13:50,280 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Checkpoint storage is set to 'jobmanager'
2021-10-15 12:13:50,296 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
2021-10-15 12:13:50,304 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3370d640 for Decibel Processor (d7ea5533f968c1d27e06f45c94e5823a).
2021-10-15 12:13:50,319 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job Decibel Processor (d7ea5533f968c1d27e06f45c94e5823a) under job master id 00000000000000000000000000000000.
2021-10-15 12:13:50,322 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-10-15 12:13:50,323 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Decibel Processor (d7ea5533f968c1d27e06f45c94e5823a) switched from state CREATED to RUNNING.
2021-10-15 12:13:50,333 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Property Config Stream (1/1) (1577d0fa1357c72c49057d4cdd7e2ddd) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,333 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom File Source (1/1) (73a31f7f635a874d2f8df49e7184b83d) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,333 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Collection File Input -> Map -> Collection Messages -> Filter Collection Messages -> WaterMark Filter Message (1/1) (55cf6f5740276160f44cc651c0971af8) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,333 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Session Window -> Filter -> Map -> Sink: Unnamed (1/1) (0d5400e22e1a0b8491f0a98ffcf3ac12) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,333 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed Reduce -> Stats output -> Sink: Unnamed (1/1) (f2fefa969c0f63f43c817e4c981b6edf) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,334 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Storage side output (1/1) (b3204e02a57a8bc0b04a63f044050fd6) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,334 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: In progress sessions side output (1/1) (9def69110da478c06dfc1636c76349aa) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,334 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: In progress page views side output (1/1) (cec4378c07fe32c56c9db45ae144079b) switched from CREATED to SCHEDULED.
2021-10-15 12:13:50,361 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Connecting to ResourceManager akka.tcp://flink@jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-10-15 12:13:50,366 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Resolved ResourceManager address, beginning registration
2021-10-15 12:13:50,369 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager [email protected]://flink@jobmanager:6123/user/rpc/jobmanager_2 for job d7ea5533f968c1d27e06f45c94e5823a.
2021-10-15 12:13:50,375 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager [email protected]://flink@jobmanager:6123/user/rpc/jobmanager_2 for job d7ea5533f968c1d27e06f45c94e5823a.
2021-10-15 12:13:50,378 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2021-10-15 12:13:50,381 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job d7ea5533f968c1d27e06f45c94e5823a: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]

But if we add a file to the folder that should be processed nothing happens.

We use docker-compose for this since this has to be deployed with other parts of code that are not related to Flink (I'm omitting the parts that are not relevant)

short-term-processor:
    # Define the name and version of the docker image
    image: short-term-processor
    container_name: short_term_processor
    command: standalone-job --job-classname com.decibel.flink.processor.Main
    restart: always
    ports:
     - "8081:8081"
    volumes:
      - /flink/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml
      - /flink/processor.properties:/opt/flink/conf/processor.properties
      - /flink/data:/data1

Config yaml and properties are exactly the same as the ones that works without issues in our local clusters. Any idea why this may be happening?


Solution

  • After several tries, we found the problem and the solution: Standalone docker job just submit the job but never gets started.

    In order to solve this, we need to create 2 extra containers, one for job manager and one for task manager:

    service:
        # Define the name and version of the docker image
        image: service
        container_name: service
        command: "flink run /opt/flink/usrlib/artifacts/service.jar --port 9000"
        # Logs are pushed to syslog
        logging:
          driver: syslog
        restart: always
        ports:
         - "9000:9000"
        volumes:
          - /etc/localtime:/etc/localtime:ro
          - /var/log/decibel/:/opt/flink/log/
          - /conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml
          - /conf/processor.properties:/opt/flink/conf/processor.properties
          - /data1:/data1/decibelinsightdata
        depends_on:
          - jobmanager
          - taskmanager
      jobmanager:
        image: apache/flink:1.13.0-scala_2.11-java11
        command: "jobmanager.sh start-foreground"
        ports:
          - 8081:8081
        volumes:
          - /data1:/data1/decibelinsightdata
          - /conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
      taskmanager:
        image: apache/flink:1.13.0-scala_2.11-java11
        command: "taskmanager.sh start-foreground"
        volumes:
          - /data1:/data1/decibelinsightdata
          - /conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
    
    

    After this, jobs starts without any issue