Search code examples
apache-kafkaapache-flinkflink-sql

Flink SQL, how to get the first record and the last record by eventtime in kafka data flow and store it to a DB(such as GP, MySQL)?


Flink SQL, how to get the first record and the last record by eventtime in Kafka data flow and store it to a DB(such as MySQL)?

Addition, if new record in Kafka data flow coming, we should update the record in MySQL.

  1. Assumption, records in Kafka are as follows:
    {'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
    {'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
    {'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
    {'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}      
    {'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
    {'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
    {'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}

  1. By Flink SQL, the results I expected are as follows:
    {'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
    {'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
    {'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
    {'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}
  1. We store these record in MySQL, assump that the result are as follows,
    |    word    |    first_appearance_time    |    first_appearance_page    |    last_appearance_time    |    last_appearance_page    |
    |    hello   |    2020-12-04 16:00:00      |            5                |    2020-12-04 16:15:00     |             20             |
    |    are     |    2020-12-04 16:00:00      |            12               |    2020-12-04 16:10:00     |             18             |
  1. If a new record in Kafka is coming,
    {'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}
  1. I hope we can update the record of are in MySQL, the updating result are as follows:
    |    word    |    first_appearance_time    |    first_appearance_page    |    last_appearance_time    |    last_appearance_page    |
    |    hello   |    2020-12-04 16:00:00      |            5                |    2020-12-04 16:15:00     |             20             |
    |    are     |    2020-12-04 16:00:00      |            12               |    2020-12-04 17:18:00     |             30             |

I have some trouble in the 2th and 5th step, can anyone give some idea?


Solution

  • Deduplication with ordering by rowtime would be the easiest way, but this is supported in 1.12. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication

    CREATE TABLE src (
      word STRING,
      eventtime TIMESTAMP(3),
      appear_page INT,
      WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
    ) WITH (
      'connector' = 'kafka',
      ...
    );
    
    -- get last row by word key
    SELECT word, eventtime, appear_page
    FROM (
      SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
      FROM src
    ) WHERE rownum = 1;
    
    

    This query should also work in 1.11, but is not optimized into a deduplication, but a TopN operator which is less efficient.