I created a purchases
table using Kafka connector (using Flink SQL):
CREATE TABLE purchases (
country STRING,
product STRING
) WITH (
'connector' = 'kafka',
'topic' = 'purchases',
'properties.bootstrap.servers' = 'kafka:29092',
'value.format' = 'json',
'properties.group.id' = '1',
'scan.startup.mode' = 'earliest-offset'
);
and then I perform the following aggregation in Apache Flink:
SELECT `country`, `product`, `purchases`
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
FROM (select country, product, count(*) as `purchases` from purchases group by country, product))
WHERE row_num <= 3;
There're 2 calculations essentially above:
select country, product, count(*) as
purchases from purchases group by country, product
)I wonder what happens on each new row in purchases
table: does Flink recalculate each the grouping for all countries and products and recalculates top 3 product per each country or Flink is smart enough to only recalculate the group to which the new record belongs to and also is smart enough to recalculate top 3 row only for the group to which the product belongs to?
I assume Flink is smart enough in both cases because the overhead to recalculating all of the rows would be very high, however I couldn't find any explicit information regarding this in the documentation.
Flink is using state in order to avoid recalculations, and is instead incrementally computing the desired results.
In the case of
select country, product, count(*) as `purchases`
from purchases
group by country, product
Flink will maintain a counter for every distinct country/product pair, and increment the appropriate counter for each incoming event.
Using EXPLAIN on your query reveals what the optimized execution plan looks like:
Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[country], orderBy=[purchases DESC], select=[country, product, purchases])
+- Exchange(distribution=[hash[country]])
+- GroupAggregate(groupBy=[country, product], select=[country, product, COUNT(*) AS purchases])
+- Exchange(distribution=[hash[country, product]])
+- TableSourceScan(table=[[default_catalog, default_database, purchases]], fields=[country, product])
This tells us that
In general, if you wish to reduce the amount of state being kept, and can accept the risk of the results being less than completely accurate as a result, you can configure Flink's Table/SQL API to expire idle state. But I have no idea how that would affect the Rank operator.