I have the below SQL query that I'm using in flink job. mysql_table
is created using JDBC connector and kafa_source
table is created from the incoming kafka stream.
SELECT T.event_id, T.event_name, TUMBLE_END(T.event_time, INTERVAL '5' MINUTE) AS event_time,
MAX(T.event_value) AS max_event_value FROM (
SELECT d.event_id, d.event_name, d.event_source_id, d.event_key, s.event_value, s.event_time
FROM kafka_source s JOIN mysql_table FOR SYSTEM_TIME AS OF s.proc_time AS d
ON d.event_id = s.event_id and d.event_name = s.event_name) T
GROUP BY T.event_id, T.event_key, TUMBLE(T.event_time, INTERVAL '5' MINUTE)
I am performing Temporal join between the two and this is running fine when I checked in Flink's sql-client CLI (Tested with flink-faker
). The inner query works perfectly fine and is printing results. Can anyone help me in identifying the issue with the query?
Edit: I'm looking for output with TUMBLE event created over 5 minutes like this
+I ("11", "SPILL_OVER", 2022-04-28T00:30:00", 28.0)
+I ("11", "SPILL_OVER", 2022-04-28T00:35:00", 32.4)
+I ("11", "SPILL_OVER", 2022-04-28T00:40:00", 19.6)
+I ("11", "SPILL_OVER", 2022-04-28T00:45:00", 22.3)
The schema of mysql table is
+-----------------+--------------+
| Field | Type |
+-----------------+--------------+
| event_id | varchar(64) |
| event_source_id | varchar(255) |
| event_name | varchar(255) |
and kafka table's schems is
event_id STRING
event_name STRING
event_time TIMESTAMP(9)
event_value DOUBLE
Edit:
I observed that TUMBLE_END
is working fine with PROCTIME()
column, but the same is not working with event_time
. Here is the output for the query that selects proc_time
and event_time
respectively.
+I[2022-05-09T14:36:21.078Z, 2022-05-09T14:36:14.163Z]
+I[2022-05-09T14:36:21.079Z, 2022-05-09T14:36:19.170Z]
The below query is working on top of kafka_source
table.
SELECT event_id, event_name, MAX(event_value), TUMBLE_END(proc_time, INTERVAL '2' MINUTE)
FROM kafka_source
GROUP BY event_id, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE)
and gives the below similar output
+I[[11, SPILL_OVER, 2022-05-09T20:10, 0.93]
+I[[12, SPILL_OVER, 2022-05-09T20:10, 0.9]
+I[[11, PRAXY, 2022-05-09T20:12, 0.91]
The same query when I use event_time
in place of proc_time
is not returning any result. I'm creating these columns in the table like this:
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTime, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
where eventTime
is simply the incoming timestamp value from kafka topic. Both fields are of same type, TIMESTAMP_LTZ(3)
proc_time TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
event_time TIMESTAMP_LTZ(3) *ROWTIME*
What is the mistake I'm doing here?
I fixed this by adding a setting to the table environment.
tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")
I'm able to create windows right after setting the timeout