Search code examples
apache-flinkflink-streamingflink-sql

There is no output for the tumble window group by in my case


I am using Flink 1.12. I want to read a csv, and does tumble window group by based on processing time.

The code is as follows, but there is no output the query sql_tubmle_window, I would like to know where the problem is

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object Sql017_ProcessTimeAttributeDDLTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)

    val ddl =
      """
      create table sourceTable(
      key STRING,
      price DOUBLE,
      pt as PROCTIME() ---processing time
      ) with (
        'connector' = 'filesystem',
        'path' = 'D:/stock_id_price.csv',
        'format' = 'csv'
      )
      """.stripMargin(' ')

    //Create the source table
    tenv.executeSql(ddl)

    //NOTE: The following query produces correct result
    tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()

    //there is no output for the tumble group by query
    val sql_tumble_window =
      """
       SELECT
           TUMBLE_START(pt, INTERVAL '4' second),
           TUMBLE_END(pt, INTERVAL '4' second),
           sum(price),
           'FLAG'
       FROM sourceTable
       GROUP BY TUMBLE(pt, INTERVAL '4' second)
     """.stripMargin(' ')

    println("=" * 20)
    println("=" * 20)

    //There is no output for this sql query
    tenv.sqlQuery(sql_tumble_window).toAppendStream[Row].print()

    env.execute()

    Thread.sleep(20 * 1000)
  }

}


Solution

  • The problem is that the job runs to completion before the window has a chance to fire.

    When a Flink streaming job is run with a bounded input (such as a file), the job ends once it has fully consumed and processed the input. Meanwhile, the 4-second-long processing time window will be triggered whenever the time of day happens to be an exact multiple of 4 seconds since the epoch -- which is unlikely to occur unless the CSV file is very long.

    You might expect the 20-second-long sleep to take care of this. But the sleep is happening in the Flink client after it submits the job to the cluster. This does not affect the execution of the streaming job itself.