Search code examples
apache-flink

How do we understand the Word-Count sample of Apache Flink


I'm just learning Apache Flink and here is the Word Count sample: https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/tutorials/local_setup.html

I works but I have something that can't understand clearly.

Flink have three parts: JobManager, TaskManager and JobClient. As my understanding, the java code of the class SocketWindowWordCount should be a part of JobClient, this class should send what it asks to do to the JobClient then the JobClient can send the tasks to the JobManager.

Am I right?

If I'm right, I don't know which part of code in the file SocketWindowWordCount.java is responsible to send what it asks to do to the JobClient.

Is listening on the port also a part of the task which will be sent to the JobManager then to TaskManager?

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");

// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
    .flatMap(new FlatMapFunction<String, WordWithCount>() {
        @Override
        public void flatMap(String value, Collector<WordWithCount> out) {
            for (String word : value.split("\\s")) {
                out.collect(new WordWithCount(word, 1L));
            }
        }
    })
    .keyBy("word")
    .timeWindow(Time.seconds(5), Time.seconds(1))
    .reduce(new ReduceFunction<WordWithCount>() {
        @Override
        public WordWithCount reduce(WordWithCount a, WordWithCount b) {
            return new WordWithCount(a.word, a.count + b.count);
        }
    });

// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);

Is all of the codes above a part of the task?

In a word, I kind of understand the architecture of Flink but I want to know more details about how the JobClient works.


Solution

  • Your program itself is the JobClient from the architectural point of view. In particular, you have dependencies on the JobClient that are used when you execute the DataStream.

    All of your code is the task definition that gets serialized and sent to the JobManager, which distributes it to the TaskManager.

    You left out the "most" important part of the program

    env.execute("Socket Window WordCount");

    That is actually triggering the JobClient to package the DataStream program and send it to the configured JobManager.