Flink SQL
, how to get the first record and the last record by eventtime in Kafka
data flow and store it to a DB(such as MySQL
)?
Addition, if new record in Kafka
data flow coming, we should update the record in MySQL
.
Kafka
are as follows: {'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
{'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
{'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}
Flink SQL
, the results I expected are as follows: {'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}
MySQL
, assump that the result are as follows, | word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 16:10:00 | 18 |
Kafka
is coming, {'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}
are
in MySQL
, the updating result are as follows: | word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 17:18:00 | 30 |
I have some trouble in the 2th and 5th step, can anyone give some idea?
Deduplication with ordering by rowtime would be the easiest way, but this is supported in 1.12. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication
CREATE TABLE src (
word STRING,
eventtime TIMESTAMP(3),
appear_page INT,
WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
...
);
-- get last row by word key
SELECT word, eventtime, appear_page
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
FROM src
) WHERE rownum = 1;
This query should also work in 1.11, but is not optimized into a deduplication, but a TopN operator which is less efficient.