Search code examples
javaapache-flinkflink-streaming

Flink streaming, what exactly does 'sum' do?


I have trouble understanding streaming, take workcount as an example, for infinite source like Kafka, what exactly does 'sum' do?

DataStream<Tuple2<String, Long>> counts = input
                ......
                .returns(Types.TUPLE(Types.STRING, Types.LONG))
                .keyBy(0)
                .sum(1);

I kinda understand it when there's a time window, it's like a 'batch' to me since it has start and end time, but when there's no time window at all,

  1. What is the start time and end time?
  2. When the 3rd time Flink receives word 'foo', does 'sum' go through all the old 'foo', do 1+1+1, and give the result '3'. Or, Flink somehow saves a intermediate result '2' on the previous step, so 'sum' only do 2+1?
  3. Is there an alternative way to do the sum, I mean, use keyBy(0).process(...) or something?

Solution

  • The specified program will translate to a StreamGroupedReduce with a SumAggregator. What the StreamGroupedReduce will do is to continuously reduce the incoming data stream and outputting the new reduced value after every incoming record.

    Internally, the StreamGroupedReduce uses a ValueState which keeps the current reduce value. Whenever a new record arrives, the current reduce value is combined with the incoming record by calling the ReduceFunction (in your case SumAggregator). The result of this operation is then stored in the operator's ValueState and output to down stream consumers.

    For example: The input stream 1, 2, 3, 4, 5 will generate the following output when being summed: 1, 3, 5, 9, 14.

    If you want, then you can implement the same behaviour with keyBy(0).process(...).