Search code examples
apache-flinkflink-streamingflink-sqlflink-table-api

Flink SQL Unit Testing: How to Assign Watermark?


I'm writing a unit test for a Flink SQL statement that uses match_recognize. I'm setting up the test data like this

Table data = tEnv.fromValues(DataTypes.ROW(
  DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
  DataTypes.FIELD("foobar", DataTypes.STRING()),
  ....
  ),
  row(...),
  row(...)
);

I have two questions,

  • How do I designate event_time as the field for watermarking? (indicating rowtime)
  • Less important, give the table created a meaningful name?

FLINK VERSION: 1.11


Solution

  • You hit a current limitation of the Table API: it's not possible to define watermarks and rowtime attributes in combination with the forValues method; you need a connector. There are a couple of options to work around it:

    1. Use a csv connector that you stack up with your VALUES, like shown in this example.

    2. Use the built-in DataGen connector. Since you're putting together a unit test for CEP, I imagine that you want some degree of control over the data that is generated, so this is probably not a viable option. Thought I'd mention it, anyways.

    Note: Using SQL DDL syntax is the recommended way to create tables from Flink 1.10. This would make both things you're trying to do (i.e. defining a watermark and naming your table) more straightforward:

    tEnv.executeSql("CREATE TABLE table_name (\n" +
                    "             event_time TIMESTAMP(3),\n" +
                    "             foobar STRING \n" +
                    "             WATERMARK FOR event_time AS event_time\n" +
                    ") WITH (...)"
    );
    
    Table data = tEnv.from("table_name");
    
    

    The watermark is declared as a computed column and there are multiple watermark strategies you can opt to use. Please check this documentation page for more details.