Search code examples
apache-flinkflink-sql

Does Apache Flink use previous state in order to recalculate aggregations?


I created a purchases table using Kafka connector (using Flink SQL):

CREATE TABLE purchases (
  country STRING,
  product STRING
) WITH (
   'connector' = 'kafka',
   'topic' = 'purchases',
   'properties.bootstrap.servers' = 'kafka:29092',
   'value.format' = 'json',
   'properties.group.id' = '1',
   'scan.startup.mode' = 'earliest-offset'
);

and then I perform the following aggregation in Apache Flink:

SELECT `country`, `product`, `purchases`
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
  FROM (select country, product, count(*) as `purchases` from purchases group by country, product))
WHERE row_num <= 3;

There're 2 calculations essentially above:

  1. grouping of purchases by country and product (select country, product, count(*) as purchases from purchases group by country, product)
  2. top-n aggregation of getting top 3 products per country.

I wonder what happens on each new row in purchases table: does Flink recalculate each the grouping for all countries and products and recalculates top 3 product per each country or Flink is smart enough to only recalculate the group to which the new record belongs to and also is smart enough to recalculate top 3 row only for the group to which the product belongs to?

I assume Flink is smart enough in both cases because the overhead to recalculating all of the rows would be very high, however I couldn't find any explicit information regarding this in the documentation.


Solution

  • Flink is using state in order to avoid recalculations, and is instead incrementally computing the desired results.

    In the case of

    select country, product, count(*) as `purchases`
    from purchases
    group by country, product
    

    Flink will maintain a counter for every distinct country/product pair, and increment the appropriate counter for each incoming event.

    Using EXPLAIN on your query reveals what the optimized execution plan looks like:

    Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[country], orderBy=[purchases DESC], select=[country, product, purchases])
    +- Exchange(distribution=[hash[country]])
       +- GroupAggregate(groupBy=[country, product], select=[country, product, COUNT(*) AS purchases])
          +- Exchange(distribution=[hash[country, product]])
             +- TableSourceScan(table=[[default_catalog, default_database, purchases]], fields=[country, product])
    

    This tells us that

    • the input is an append-only stream (rather than an updating, CDC stream)
    • that stream is then hash-partitioned on (country, product)
    • a GroupAggregate operator is then used to implement the partitioned counting
    • that GroupAggregate operator produces an updating stream, which is then re-hash-partitioned by country
    • a Rank operator consumes that updating count stream, and produces the final result using the UpdateFastStrategy (this strategy takes advantage of knowing that the incoming update stream won't contain any deletions, and that the updates will be monotonically increasing)

    In general, if you wish to reduce the amount of state being kept, and can accept the risk of the results being less than completely accurate as a result, you can configure Flink's Table/SQL API to expire idle state. But I have no idea how that would affect the Rank operator.