Search code examples
javaapache-flinkflink-sql

Create pageable JDBC source for Flink Job


For processing data from DB I am using flink. I have created input with jdbc.

val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
             .setDrivername(driver)
             .setDBUrl(url)
             .setUsername(username)
             .setPassword(password)
             .setQuery("select id, name from users")
             .finish()

env.createInput(inputFormat)

The issue is that this input is getting all data from table. Since this table contains huge amount of information I need something like pageable jdbc source. Is there any additional setting I can use for that?


Solution

  • You can split a query in to multiple partial which are independently executed by specifying the query as a parameterized query and providing values to bind against the parameter(s).

    The following is taken from the JavaDoc of JDBCInputFormat.

     * <p>In order to query the JDBC source in parallel, you need to provide a
     * parameterized query template (i.e. a valid {@link PreparedStatement}) and
     * a {@link ParameterValuesProvider} which provides binding values for the
     * query parameters. E.g.:
     *
     * <pre><code>
     *
     * Serializable[][] queryParameters = new String[2][1];
     * queryParameters[0] = new String[]{"Kumar"};
     * queryParameters[1] = new String[]{"Tan Ah Teck"};
     *
     * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
     *              .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
     *              .setDBUrl("jdbc:derby:memory:ebookshop")
     *              .setQuery("select * from books WHERE author = ?")
     *              .setRowTypeInfo(rowTypeInfo)
     *              .setParametersProvider(new GenericParameterValuesProvider(queryParameters))
     *              .finish();
     * </code></pre>
    

    Note that:

    • The queried table should have an appropriate index on the parameterized attribute. Otherwise, you will do several full scans on over the table, which is not helpful.
    • The parameters should cover all (required) data of the table just once. Otherwise you might miss some rows or query some rows twice.