Search code examples
apache-flinkflink-streaming

Flink keyBy grouping issue


I'm pretty new to Flink. I have this code that maps, groups, and sums input JSON.

It's very similar to the word count example.

I expected to get (vacant,1) (occupied,2)

But, for some reason I'm getting (occupied,1) (vacant,1) (occupied,2)

  public static void main(String[] args) throws Exception {
        String s = "{\n" +
                "    \"Port_128\": \"occupied\",\n" +
                "    \"Port_129\": \"occupied\",\n" +
                "    \"Port_120\": \"vacant\"\n" +
                "\n" +
                "}";
        StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> in = env.fromElements(s);
        SingleOutputStreamOperator<Tuple2<String, Integer>> t = 
        in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> 
            collector) throws Exception {
                ObjectMapper mapper = new ObjectMapper();
                JsonNode node = mapper.readTree(s);
                node.elements().forEachRemaining(v -> {
                    collector.collect(new Tuple2<>(v.textValue(), 1));
                });

            }
        }).keyBy(0).sum(1);

        t.print();
        env.execute();

Solution

  • Running your code, I get:

    10/19/2017 11:27:38 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING 
    (occupied,1)
    (occupied,2)
    (vacant,1)
    10/19/2017 11:28:03 Keyed Aggregation -> Sink: Unnamed(1/1) switched to FINISHED 
    

    Which is slightly different than your output but important. The reason being that the code is outputting the sum of each key as it receives the data, so first it gets the first occupied (outputting 1), then the second (outputting that the sum for this keyed process is now 2), and then sends the vacant to another keyed process and outputs 1. So this seems like the proper output to me.

    EDIT

    Per comment below, here is the code that would give you the desired output:

    public static void main(String[] args) throws Exception {
      String s = "{\n" +
          "    \"Port_128\": \"occupied\",\n" +
          "    \"Port_129\": \"occupied\",\n" +
          "    \"Port_120\": \"vacant\"\n" +
          "\n" +
          "}";
      ExecutionEnvironment env =
          ExecutionEnvironment.getExecutionEnvironment();
      DataSet<String> in = env.fromElements(s);
      AggregateOperator<Tuple2<String, Integer>> t =
          in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>>
                collector) throws Exception {
              ObjectMapper mapper = new ObjectMapper();
              JsonNode node = mapper.readTree(s);
              node.elements().forEachRemaining(v -> {
                collector.collect(new Tuple2<>(v.textValue(), 1));
              });
    
            }
          }).groupBy(0).sum(1);
    
      t.print();
      env.execute();
    }