Search code examples
apache-flink

Apache Flink: What type of record does JDBCInputFormat return?


I am getting an error related to setRowTypeInfo for a JDBCInputFormat. The error is below. Clearly the Tuple2 type of the DataSet doesn't like the RowTypeInfo of the JDBCInputFormat but I can't find anywhere that provides clarification on how to define the format.

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink: Compilation failure [ERROR] /Users/rocadmin/Desktop/flink/flink/src/main/java/svalarms/BatchJob.java:[125,48] incompatible types: inferred type does not conform to equality constraint(s) [ERROR] inferred: org.apache.flink.api.java.tuple.Tuple2 [ERROR] equality constraints(s): org.apache.flink.api.java.tuple.Tuple2,org.apache.flink.types.Row [ERROR] [ERROR] -> [Help 1]

    DataSet<   Tuple2<Integer, Integer>    > dbData =
            env.createInput(
                    JDBCInputFormat.buildJDBCInputFormat()
                            .setDrivername("oracle.jdbc.driver.OracleDriver")
                            .setDBUrl("jdbc:oracle:thin:@//[ip]:1521/sdmprd")
                            .setQuery("" +
                                    "SELECT T2.work_order_nbr, T2.work_order_nbr " +
                                    "FROM sdm.work_order_master T2  " +
                                    "WHERE " +
                                            "TO_DATE(T2.date_entered + 19000000,'yyymmdd') >= CURRENT_DATE - 14 " +
                                            "AND T2.W_O_TYPE = 'TC' " +
                                            "AND T2.OFFICE_ONLY_FLG = 'N' " +
                                    "")
                            .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))

                            .finish()
            );

Solution

  • A JDBCInputFormat returns records of type Row. Hence, the resulting DataSet should be typed to Row, i.e.,

    DataSet<Row> dbData =
      env.createInput(
        JDBCInputFormat.buildJDBCInputFormat()
          .setDrivername("oracle.jdbc.driver.OracleDriver")
          .setDBUrl("jdbc:oracle:thin:@//[ip]:1521/sdmprd")
          .setQuery(
            "SELECT T2.work_order_nbr, T2.work_order_nbr " +
            "FROM sdm.work_order_master T2  " +
            "WHERE " +
              "TO_DATE(T2.date_entered + 19000000,'yyymmdd') >= CURRENT_DATE - 14 " +
              "AND T2.W_O_TYPE = 'TC' " +
              "AND T2.OFFICE_ONLY_FLG = 'N' "
            )
          .setRowTypeInfo(Types.ROW(Types.INT, Types.INT))
          .finish()
        );