Search code examples
javaapache-flinkflink-sql

Apache Flink Table query result as string values


I am writing a query from a flink table api to retrieve a record. Then check if a record was found and if so, get the string value of each of the record's column values.

i.e.

users: 
|id | name | phone |
|---|------|-------|
| 01| sam  | 23354 |
| 02| jake | 23352 |
| 03| kim  | 23351 |

Issue is that flink only returns Table from a query so i am not able to 1: check if a record was found and 2: get the individual values of the found record's values

sudo code:

foundRecord = find record by phone
  if foundRecord {
    create new instance of Visitor
    Visitor.name = foundRecord.name
    Visitor.id = foundRecord.id
  } else {
    throw exception
  }

The code below as recommended by flink docs gives me a table but not sure how to implement the above sudo code since it is returning as another table and i need the actual record values.

Table users = registeredUsers.select("id, name, phone").where("phone === '23354'"));

Flink Docs for ref: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#expression-syntax


Solution

  • In order to know that a matching record cannot be found, the input must be bounded -- so we'll use a BatchTableEnvironment, and not a StreamTableEnvironment. (With streaming input, a matching record might eventually arrive and satisfy the query. Only with batch input can we prove the absence of a match.)

    import org.apache.flink.api.common.functions.FlatMapFunction
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.table.api.scala.BatchTableEnvironment
    import org.apache.flink.types.Row
    import org.apache.flink.api.scala._
    import org.apache.flink.table.api.scala._
    import org.apache.flink.util.Collector
    
    class MissingResultException() extends Exception {}
    
    object Phone {
      case class Visitor(name: String, id: String)
    
      @throws[Exception]
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val tableEnv = BatchTableEnvironment.create(env)
    
        val rawInput = env.fromElements(
          ("01", "sam", "23354"),
          ("02", "jake", "23352"),
          ("03", "kim", "23351"))
    
        val events = tableEnv.fromDataSet(rawInput, 'id, 'name, 'phone)
        tableEnv.registerTable("events", events)
        val resultTable = tableEnv
          .from("events")
          .select('id, 'name, 'phone)
          .where("phone === 'missing'")
    
        val results = resultTable.toDataSet[Row]
    
        results
          .map(row => new Visitor(row.getField(1).toString, row.getField(0).toString))
          .print
    
        val count: DataSet[Long] = env.fromElements(results.count())
    
        count
          .flatMap(new FlatMapFunction[Long, Collector[Long]]{
    
            override def flatMap(x: Long, collector: Collector[Collector[Long]]): Unit = {
              if (x == 0L) {
                throw new MissingResultException
              }
            }})
    
          .print()
      }
    }
    

    The approach I used to detect that the result set is empty feels like something of a hack, but I couldn't think of anything better. Note that the print() at the very end is necessary, though there's nothing to print, because any computation that isn't ultimately fed to a sink will be optimized away, and not executed.