Search code examples
apache-flinkflink-streamingflink-sql

Table API in Streaming mode is not printing the results to console


Can you please let me know why grouped results of streaming table is not printing to the results to console.

Non-grouped Results are printing to the console without any issues.

Flink Version = 1.15.0 (SCALA = 2.12) (JAVA = 8)

Thank you in advance.

import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Schema, Table, TableDescriptor, TableEnvironment}

object readKafkaStream02 extends App {

  val settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    // .inBatchMode()
    .build()

  val tEnv = TableEnvironment.create(settings);

  val schema = Schema.newBuilder()
  schema.column("id", DataTypes.INT())
  schema.column("type", DataTypes.STRING())
  schema.column("amount", DataTypes.FLOAT())
  schema.column("trx_timestamp", DataTypes.TIMESTAMP(3))
  schema.watermark("trx_timestamp", "trx_timestamp - INTERVAL '10' SECOND")

  tEnv.createTemporaryTable("kafka_stream_input",  TableDescriptor.forConnector("kafka")
    .schema(schema.build())
    .format("csv")
    .option("topic","test_producer01")
    .option("properties.bootstrap.servers","localhost:9092")
    .option("properties.group.id","flink-test")
    .option("scan.startup.mode" , "latest-offset")
    .build()
  )

 /* tEnv.executeSql(
    """select * FROM TABLE
      |(
      |TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
      |)
      |""".stripMargin ).print()*/  ========> works fine 


  val temp_table : Table  = tEnv.sqlQuery(
  """select  sum(amount)  , window_start , window_end from TABLE
    |(
    |TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
    |) GROUP BY window_start, window_end ;
    |""".stripMargin )

  tEnv.registerTable("temp_table",temp_table)

  temp_table.execute().print() `enter code here`======> just hung and not printing any o/p to console 


 /* tEnv.executeSql(
    """select TUMBLE_START(trx_timestamp, INTERVAL '10' MINUTE ) , sum(amount)  from kafka_stream_input
      |group by TUMBLE(trx_timestamp, INTERVAL '10' MINUTE )
      |""".stripMargin).print()*/

}

Solution

  • followed this : Flink SQL Query not returning results

    Working as expected.

    val tEnv_config = tEnv.getConfig();

    tEnv_config.set("table.exec.source.idle-timeout", "1000 ms");