Can you please let me know why grouped results of streaming table is not printing to the results to console.
Non-grouped Results are printing to the console without any issues.
Flink Version = 1.15.0 (SCALA = 2.12) (JAVA = 8)
Thank you in advance.
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Schema, Table, TableDescriptor, TableEnvironment}
object readKafkaStream02 extends App {
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
// .inBatchMode()
.build()
val tEnv = TableEnvironment.create(settings);
val schema = Schema.newBuilder()
schema.column("id", DataTypes.INT())
schema.column("type", DataTypes.STRING())
schema.column("amount", DataTypes.FLOAT())
schema.column("trx_timestamp", DataTypes.TIMESTAMP(3))
schema.watermark("trx_timestamp", "trx_timestamp - INTERVAL '10' SECOND")
tEnv.createTemporaryTable("kafka_stream_input", TableDescriptor.forConnector("kafka")
.schema(schema.build())
.format("csv")
.option("topic","test_producer01")
.option("properties.bootstrap.servers","localhost:9092")
.option("properties.group.id","flink-test")
.option("scan.startup.mode" , "latest-offset")
.build()
)
/* tEnv.executeSql(
"""select * FROM TABLE
|(
|TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
|)
|""".stripMargin ).print()*/ ========> works fine
val temp_table : Table = tEnv.sqlQuery(
"""select sum(amount) , window_start , window_end from TABLE
|(
|TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
|) GROUP BY window_start, window_end ;
|""".stripMargin )
tEnv.registerTable("temp_table",temp_table)
temp_table.execute().print() `enter code here`======> just hung and not printing any o/p to console
/* tEnv.executeSql(
"""select TUMBLE_START(trx_timestamp, INTERVAL '10' MINUTE ) , sum(amount) from kafka_stream_input
|group by TUMBLE(trx_timestamp, INTERVAL '10' MINUTE )
|""".stripMargin).print()*/
}
followed this : Flink SQL Query not returning results
Working as expected.
val tEnv_config = tEnv.getConfig();
tEnv_config.set("table.exec.source.idle-timeout", "1000 ms");