I'm wondering how much state Apache Flink uses for Top-N queries and tables in general.
First, I'm using Flink SQL to process messages from a Kafka topic:
CREATE TABLE purchases (
country STRING,
product STRING
) WITH (
'connector' = 'kafka',
'topic' = 'purchases',
'properties.bootstrap.servers' = 'kafka:29092',
'value.format' = 'json',
'properties.group.id' = '1',
'scan.startup.mode' = 'earliest-offset'
);
I also initialized a JDBC connector:
CREATE TABLE aggregations (
`country` STRING,
`product` STRING,
`purchases` BIGINT NOT NULL,
PRIMARY KEY (`country`, `product`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/postgres?&user=postgres&password=postgres',
'table-name' = 'aggregations'
);
Finally I start the aggregation:
insert into aggregations
SELECT `country`, `product`, `purchases`
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
FROM (select country, product, count(*) as `purchases` from purchases group by country, product))
WHERE row_num <= 3;
From Flink state management docs is says:
Conceptually, source tables are never kept entirely in state. An implementer deals with logical tables (i.e. dynamic tables). Their state requirements depend on the used operations.
So do I understand correctly that Flink doesn't save the rows of purchases
table from Kafka connector?
More importantly, in aggregation:
select country, product, count(*) as `purchases` from purchases group by country, product
does Flink keep every country, product key in state?
Flink will convert SQL/Table API into DataStream/DataSet operators. E.g. for the purchases
table in SQL, it will be converted into FlinkKafkaConsumer
in DataStream.
You are right. Flinks does not save the data from Flink into state, but instead, saving Kafka offset into state.
For the select and group by
statement, yes, Flink will save the keys and values(count) in states.