I'm writing a unit test for a Flink SQL statement that uses match_recognize. I'm setting up the test data like this
Table data = tEnv.fromValues(DataTypes.ROW(
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("foobar", DataTypes.STRING()),
....
),
row(...),
row(...)
);
I have two questions,
FLINK VERSION: 1.11
You hit a current limitation of the Table API: it's not possible to define watermarks and rowtime attributes in combination with the forValues
method; you need a connector. There are a couple of options to work around it:
1. Use a csv
connector that you stack up with your VALUES
, like shown in this example.
2. Use the built-in DataGen connector. Since you're putting together a unit test for CEP, I imagine that you want some degree of control over the data that is generated, so this is probably not a viable option. Thought I'd mention it, anyways.
Note: Using SQL DDL syntax is the recommended way to create tables from Flink 1.10. This would make both things you're trying to do (i.e. defining a watermark and naming your table) more straightforward:
tEnv.executeSql("CREATE TABLE table_name (\n" +
" event_time TIMESTAMP(3),\n" +
" foobar STRING \n" +
" WATERMARK FOR event_time AS event_time\n" +
") WITH (...)"
);
Table data = tEnv.from("table_name");
The watermark is declared as a computed column and there are multiple watermark strategies you can opt to use. Please check this documentation page for more details.