Search code examples
javaapache-flinkflink-streaming

Numbers in output of Flink WordCount in IntelliJ


I am learning Apache Flink, and integrated it via Maven in IntelliJ. I tried this WordCount example from GitHub: WordCount example from GitHub

where I simply adjusted the input text.

The main part of the code producing the output is:

DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(value -> value.f0)
                        .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
        // execute program
        env.execute("Streaming WordCount");

I get the following output in the IntelliJ

4> (name,1)
4> (years,1)
3> (hello,1)
3> (twice,1)
5> (the,1)
2> (i,1)
6> (my,1)
2> (am,1)
6> (florian,1)
7> (old,1)
2> (thirteen,1)
6> (word,1)
8> (is,1)
8> (is,2)
6> (florian,2)
6> (written,1)

So I have two questions:

  1. What do the "$NUMBER>" symbols stand for? Are those the ID's of the workers of my Apache Flink cluster? Which line of the code does this and how to get rid of them in the output? Couldn't find it in the docs.

  2. The word "florian" appears twice in the output, as in the text. Is this due to the subtasks being written to the output to? So every time, a word count is increased, the new wordcount is written to the output? Is it possible to aggregate these, so that only the final count is written?

I know these are very basic questions, but I'm new to Apache Flink and also to distributed processing frameworks in general, but I'm keen on learning it. So thanks in advance! :)


Solution

    1. Yes, the "$NUMBER>" is showing you which of the parallel workers produced that line of output. This comes from the PrintSink, which really isn't meant to be used in production. If instead you supply an --output parameter, this example will use writeAsText to write to a file, and I don't believe these "$NUMBER>" prefixes will appear there. Or you could use the StreamingFileSink or the FileSink.

    2. You are running this application in STREAMING execution mode. When running this way, Flink has no way of knowing how much input it is going to see -- it's designed to be able to run continuously, forever. Since it's not possible to wait for "the end" in order to produce a single, final report of the word counts, what happens instead is that each input record causes an updated output record to be produced. You could instead run this application in BATCH execution mode, assuming it is only going to be provided with bounded input, in which case it will run to completion and only report the final word counts. See https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html for details.