Search code examples
apache-flinkflink-sql

Flink SQL Query not returning results


I have the below SQL query that I'm using in flink job. mysql_table is created using JDBC connector and kafa_source table is created from the incoming kafka stream.

SELECT T.event_id, T.event_name, TUMBLE_END(T.event_time, INTERVAL '5' MINUTE) AS event_time,
MAX(T.event_value) AS max_event_value FROM (
SELECT d.event_id, d.event_name, d.event_source_id, d.event_key, s.event_value, s.event_time
FROM kafka_source s JOIN mysql_table FOR SYSTEM_TIME AS OF s.proc_time AS d
ON d.event_id = s.event_id and d.event_name = s.event_name) T
GROUP BY T.event_id, T.event_key, TUMBLE(T.event_time, INTERVAL '5' MINUTE)

I am performing Temporal join between the two and this is running fine when I checked in Flink's sql-client CLI (Tested with flink-faker). The inner query works perfectly fine and is printing results. Can anyone help me in identifying the issue with the query?

Edit: I'm looking for output with TUMBLE event created over 5 minutes like this

+I ("11", "SPILL_OVER", 2022-04-28T00:30:00", 28.0)
+I ("11", "SPILL_OVER", 2022-04-28T00:35:00", 32.4)
+I ("11", "SPILL_OVER", 2022-04-28T00:40:00", 19.6)
+I ("11", "SPILL_OVER", 2022-04-28T00:45:00", 22.3)

The schema of mysql table is

+-----------------+--------------+
| Field           | Type         |
+-----------------+--------------+
| event_id        | varchar(64)  |
| event_source_id | varchar(255) |
| event_name      | varchar(255) |

and kafka table's schems is

event_id STRING
event_name STRING
event_time TIMESTAMP(9)
event_value DOUBLE

Edit: I observed that TUMBLE_END is working fine with PROCTIME() column, but the same is not working with event_time. Here is the output for the query that selects proc_time and event_time respectively.

+I[2022-05-09T14:36:21.078Z, 2022-05-09T14:36:14.163Z]
+I[2022-05-09T14:36:21.079Z, 2022-05-09T14:36:19.170Z]

The below query is working on top of kafka_source table.

SELECT event_id, event_name, MAX(event_value), TUMBLE_END(proc_time, INTERVAL '2' MINUTE)
FROM kafka_source
GROUP BY event_id, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE)

and gives the below similar output

+I[[11, SPILL_OVER, 2022-05-09T20:10, 0.93]
+I[[12, SPILL_OVER, 2022-05-09T20:10, 0.9]
+I[[11, PRAXY, 2022-05-09T20:12, 0.91]

The same query when I use event_time in place of proc_time is not returning any result. I'm creating these columns in the table like this:

Schema.newBuilder()
    .columnByExpression("proc_time", "PROCTIME()")
    .columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTime, 3)")
    .watermark("event_time", "event_time - INTERVAL '20' SECOND")
    .build()

where eventTime is simply the incoming timestamp value from kafka topic. Both fields are of same type, TIMESTAMP_LTZ(3)

proc_time TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
event_time TIMESTAMP_LTZ(3) *ROWTIME*

What is the mistake I'm doing here?


Solution

  • I fixed this by adding a setting to the table environment.

    tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")
    

    I'm able to create windows right after setting the timeout