I started flink with the following docker-compose.yml from the official flink repository. I only added the connection to the external hadoop network.
version: "2.1"
networks:
hadoop:
external:
name: flink_hadoop
services:
jobmanager:
image: flink:1.7.1-hadoop27-scala_2.11
container_name: flink-jobmanager
domainname: hadoop
networks:
- hadoop
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.7.1-hadoop27-scala_2.11
container_name: flink-taskmanager
domainname: hadoop
networks:
- hadoop
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
After this everything runs and I can access the WebUI.
Then I packaged the following job.
import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory
import stoff.schnaps.pojo.ActorMovie
object HdfsJob {
private lazy val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]) {
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val actorMovies = env
.readCsvFile[ActorMovie](
"hdfs://namenode:8020/source-data/test.tsv",
"\r\n",
"\t",
includedFields = Array(2,3,5),
pojoFields = Array("actor",
"film",
"character"))
actorMovies.print
// execute program
env.execute("Flink Batch Scala API Skeleton")
}
}
Which just reads a tsv file from hdfs into a DataSet of pojos and prints this. When I let this run local everything works fine. But when I upload the .jar and let it run on the cluster, the jobmanager logs the following exceptions:
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT is not available on the TaskExecutor.
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file LOG is not available on the TaskExecutor.
And obviously the taskmanager contains no logs, what the current problem is.
When running Flink on Docker, the docker-entrypoint.sh
script will start the Flink processes (TaskExecutor
and JobMaster
) in the foreground. This has the effect that Flink will neither redirect its STDOUT
into a file nor will it log into a file. Instead, Flink will also log to STDOUT
. That way, you can view the logs and the stdout output of your docker containers via docker logs
.
If you want to change this behaviour, it should be sufficient to alter docker-entrypoint.sh
and pass start
instead of start-foreground
:
if [ "${CMD}" == "${TASK_MANAGER}" ]; then
$FLINK_HOME/bin/taskmanager.sh start "$@"
else
$FLINK_HOME/bin/standalone-job.sh start "$@"
fi
sleep 1
exec /bin/bash -c "tail -f $FLINK_HOME/log/*.log"
When using Flink's DataSet
API, calling the method DataSet::print
will actually retrieve the respective DataSet
from the cluster back to the client where it is printed to STDOUT. Due to the retrieval, this method only works if the job has been submitted by Flink's CLI client via bin/flink run <job.jar>
. This behavior is different from the DataStream::print
method which prints the DataStream
on the TaskManagers
where the program is executed.
If you want to print the DataSet
results on the TaskManager
, one needs to call DataSet::printOnTaskManager
instead of print
.