Search code examples
apache-flinkflink-sql

super confused with table and dataset or datastream conversion


I am using Flink 1.12, and I am super confused with when table and dataset/datastream conversion can be performed. In the following code, I want to print the table content to the console, and I tried the following 3 ways ,all of them throws exception

  1. table.toDataSet[Row].print()
  2. table.toAppendStream[Row].print()
  3. table.print()

I would ask how to print the table content to the console,eg, using the print method

import org.apache.flink.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableEnvironment, TableResult}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.types.Row

object Sql021_PlannerOldBatchTest {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

    val env = TableEnvironment.create(settings)

    val fmt = new Csv().fieldDelimiter(',').deriveSchema()
    val schema = new Schema()
      .field("a", DataTypes.STRING())
      .field("b", DataTypes.STRING())
      .field("c", DataTypes.DOUBLE())

    env.connect(new FileSystem().path("D:/stock.csv")).withSchema(schema).withFormat(fmt).createTemporaryTable("sourceTable")

    val table = env.sqlQuery("select * from sourceTable")



    //ERROR: Only tables that originate from Scala DataSets can be converted to Scala DataSets.
    // table.toDataSet[Row].print()

    //ERROR:Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
    table.toAppendStream[Row].print()


    //ERROR: table doesn't has the print method
    //    table.print()

  }

}


Solution

  • In the streaming case, this will work

    tenv.toAppendStream(table, classOf[Row]).print()
    env.execute()
    

    and the batch case you can do

    val tableResult: TableResult = env.executeSql("select * from sourceTable")
    tableResult.print()