I am using Flink 1.12. I want to read a csv, and does tumble window group by based on processing time.
The code is as follows, but there is no output the query sql_tubmle_window
, I would like to know where the problem is
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object Sql017_ProcessTimeAttributeDDLTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create table sourceTable(
key STRING,
price DOUBLE,
pt as PROCTIME() ---processing time
) with (
'connector' = 'filesystem',
'path' = 'D:/stock_id_price.csv',
'format' = 'csv'
)
""".stripMargin(' ')
//Create the source table
tenv.executeSql(ddl)
//NOTE: The following query produces correct result
tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()
//there is no output for the tumble group by query
val sql_tumble_window =
"""
SELECT
TUMBLE_START(pt, INTERVAL '4' second),
TUMBLE_END(pt, INTERVAL '4' second),
sum(price),
'FLAG'
FROM sourceTable
GROUP BY TUMBLE(pt, INTERVAL '4' second)
""".stripMargin(' ')
println("=" * 20)
println("=" * 20)
//There is no output for this sql query
tenv.sqlQuery(sql_tumble_window).toAppendStream[Row].print()
env.execute()
Thread.sleep(20 * 1000)
}
}
The problem is that the job runs to completion before the window has a chance to fire.
When a Flink streaming job is run with a bounded input (such as a file), the job ends once it has fully consumed and processed the input. Meanwhile, the 4-second-long processing time window will be triggered whenever the time of day happens to be an exact multiple of 4 seconds since the epoch -- which is unlikely to occur unless the CSV file is very long.
You might expect the 20-second-long sleep to take care of this. But the sleep is happening in the Flink client after it submits the job to the cluster. This does not affect the execution of the streaming job itself.