Search code examples
apache-flink

What's the difference between max and maxBy in the KeyedStream


  1. KeyedStream#max(String field)

Applies an aggregation that gives the current maximum of the data stream at the given field expression by the given key. An independent aggregate is kept per key. A field expression is either the name of a public field or a getter method with parentheses of the {@link DataStream}'s underlying type. A dot can be used to drill down into objects, as in {@code "field1.fieldxy" }.

  1. KeyedStream#maxBy(String field)

Applies an aggregation that gives the current element with the maximum value at the given position by the given key. An independent aggregate is kept per key. If more elements have the maximum value at the given position, the operator returns the first one by default.

The javadoc for these two API looks very similar, I would ask what's the difference between them, and when to choose this one or that one


Solution

  • The difference between max and maxBy is that max returns the maximum value, whereas maxBy returns the element that has the maximum value in this field.

     keyedStream.max(0);
     keyedStream.max("key");
     keyedStream.maxBy(0);
     keyedStream.maxBy("key");
    

    In the following examples, we can also see the difference:

    Using max:

      // Create a Tumbling Window with the values of 1 day:
                .timeWindow(Time.of(1, TimeUnit.DAYS))
                // Use the max Temperature of the day:
                .max("temperature")
                // And perform an Identity map, because we want to write all values of this day to the Database:
                .map(new MapFunction<elastic.model.LocalWeatherData, elastic.model.LocalWeatherData>() {
                    @Override
                    public elastic.model.LocalWeatherData map(elastic.model.LocalWeatherData localWeatherData) throws Exception {
                        return localWeatherData;
                    }
                });
    

    Using maxBy:

      // Now take the Maximum Temperature per day from the KeyedStream:
        DataStream<LocalWeatherData> maxTemperaturePerDay =
                localWeatherDataByStation
                        // Use non-overlapping tumbling window with 1 day length:
                        .timeWindow(Time.days(1))
                        // And use the maximum temperature:
                        .maxBy("temperature");