Search code examples
scaladockerloggingapache-flink

Apache Flink: The file STDOUT is not available on the TaskExecutor


I started flink with the following docker-compose.yml from the official flink repository. I only added the connection to the external hadoop network.

version: "2.1"

networks:
  hadoop:
    external:
      name: flink_hadoop

services:
  jobmanager:
    image: flink:1.7.1-hadoop27-scala_2.11
    container_name: flink-jobmanager
    domainname: hadoop
    networks:
      - hadoop
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:1.7.1-hadoop27-scala_2.11
    container_name: flink-taskmanager
    domainname: hadoop
    networks:
      - hadoop
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

After this everything runs and I can access the WebUI.

Then I packaged the following job.

import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory
import stoff.schnaps.pojo.ActorMovie

object HdfsJob {
  private lazy val logger = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]) {
    // set up the batch execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    val actorMovies = env
      .readCsvFile[ActorMovie](
      "hdfs://namenode:8020/source-data/test.tsv",
      "\r\n",
      "\t",
      includedFields = Array(2,3,5),
      pojoFields = Array("actor",
                         "film",
                         "character"))

    actorMovies.print

    // execute program
    env.execute("Flink Batch Scala API Skeleton")
  }
}

Which just reads a tsv file from hdfs into a DataSet of pojos and prints this. When I let this run local everything works fine. But when I upload the .jar and let it run on the cluster, the jobmanager logs the following exceptions:

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT is not available on the TaskExecutor.

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file LOG is not available on the TaskExecutor.

And obviously the taskmanager contains no logs, what the current problem is.


Solution

  • When running Flink on Docker, the docker-entrypoint.sh script will start the Flink processes (TaskExecutor and JobMaster) in the foreground. This has the effect that Flink will neither redirect its STDOUT into a file nor will it log into a file. Instead, Flink will also log to STDOUT. That way, you can view the logs and the stdout output of your docker containers via docker logs.

    If you want to change this behaviour, it should be sufficient to alter docker-entrypoint.sh and pass start instead of start-foreground:

    if [ "${CMD}" == "${TASK_MANAGER}" ]; then
        $FLINK_HOME/bin/taskmanager.sh start "$@"
    else
        $FLINK_HOME/bin/standalone-job.sh start "$@"
    fi
    
    sleep 1
    exec /bin/bash -c "tail -f $FLINK_HOME/log/*.log"
    

    Update

    When using Flink's DataSet API, calling the method DataSet::print will actually retrieve the respective DataSet from the cluster back to the client where it is printed to STDOUT. Due to the retrieval, this method only works if the job has been submitted by Flink's CLI client via bin/flink run <job.jar>. This behavior is different from the DataStream::print method which prints the DataStream on the TaskManagers where the program is executed.

    If you want to print the DataSet results on the TaskManager, one needs to call DataSet::printOnTaskManager instead of print.