Search code examples
apache-flinkflink-batch

How to get insights about which data is in each slot or Operator instance?


I’m trying to get insights about the data inside each Slot in flink to understand how exactly the data is distributed But it’s realy confusing for me To know where exactly to look. I am working with a word counting example with a small text file, I want to know what data is landing in each slot or maybe to be more specific, which data each operator instance will process, maybe by printing the data inside that operator or slot.

This is my code where i am working in Local environment :

        ExecutionEnvironment ENV = ExecutionEnvironment.getExecutionEnvironment();
//        ENV.setParallelism(Runtime.getRuntime().availableProcessors());
        ENV.setParallelism(4);
        DataSet<String> input1 = ENV.readTextFile(inputPathTesting);
        DataSet<Tuple2<String,Integer>> wordTuples=input1
                .flatMap(new Tokenizer());
        wordTuples.writeAsText(outputPath);

        ENV.execute("WordCount");

I still realy dont know how is the data distribution in flink exactly works, and why for example some Sinks have no data to writes, where others have the double amount of data. Anny advice or guide will be helpful, thanks in advance. So my goal is to understand

Note : what i noticed when reading from the text file(which is 4 lines with a little difference in their lengths) that if i add for example three letters to the smalles line that will change how the data distribution looks like in the results.

Update : so what i was able to do untill now is the following :

  DataSet<Tuple2<String, Integer>> textData =input
            .filter(line -> !line.isEmpty()) 
            .flatMap(new Tokenizer())
            .map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(Tuple2<String, Integer> value) {
                    int taskId = getRuntimeContext().getIndexOfThisSubtask();
                    return new Tuple2<>(value.f0, taskId);
                }
            });

and the Tokenizer

 public static class Tokenizer extends RichMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        int taskSlotIndex = getRuntimeContext().getIndexOfThisSubtask();
        taskSlotIndex++;
        if (!value.isEmpty()) { // ignore empty lines
            out.collect(new Tuple2<>(value, taskSlotIndex));
        }
    }


    @Override
    public Object map(Object o) throws Exception {
        return null;
    }

so using the RichMapFunction i get access to the RunTimeContext, and therfore the index of the subtask and then print the line with the index of the TaskSlot that processed that line. is that right in that way ? and is there a better way to get insights about which data landes in each Slot ?


Solution

  • If you use print() rather than writeAsText, each line of output will be prefixed with the subtask index. Something like this:

    2> (1577883600000,2013000185,33.0)
    4> (1577883600000,2013000108,14.0)
    3> (1577883600000,2013000087,14.0)
    1> (1577883600000,2013000036,23.0)
    4> (1577883600000,2013000072,13.0)
    2> (1577883600000,2013000041,28.0)
    3> (1577883600000,2013000123,33.0)
    4> (1577883600000,2013000188,18.0)
    1> (1577883600000,2013000098,23.0)
    2> (1577883600000,2013000047,13.0)
    ...