Search code examples
oracleapache-flinkflink-sql

Reading data from oracle using Flink


I'm trying to use Flink to work with Oracle. Just do a simple task copy data from table to a new one.

 EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql("CREATE TABLE ExistedTable(\n" +
                "    quoteid  BIGINT,\n" +
                "    requestid      BIGINT,\n" +
                "    createddt DATE,\n" +           
                "    PRIMARY KEY (quoteid) NOT ENFORCED\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n" +
                "    'table-name'    = 'TableName',\n" +
                "   'driver'     = 'oracle.jdbc.driver.OracleDriver',\n" +
                "    'username'    = 'UserName',\n" +
                "    'password'    = 'Password'\n" +
                ")");

        tEnv.executeSql("CREATE TABLE NewTable (\n" +
                "    quoteid  BIGINT,\n" +
                "    requestid      BIGINT,\n" +
                "    createddt DATE,\n" +           
                "    PRIMARY KEY (quoteid) NOT ENFORCED\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n" +
                "    'table-name'    = 'NewTableName',\n" +
                "   'driver'     = 'oracle.jdbc.driver.OracleDriver',\n" +
                "    'username'    = 'UserName',\n" +
                "    'password'    = 'Password'\n" +
                ")");

        Table data= tEnv.from("ExistedTable");
        data.executeInsert("NewTable");

When running I've got error

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.xxx'.

Table options are:

'connector'='jdbc'
'driver'='oracle.jdbc.OracleDriver'
'password'='******'
'table-name'='xxx'
'url'='jdbc:oracle:thin:@xxx:1521:xxx'
'username'='xxx'
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.xxx'.

Is there any error in my sqlconnection. I couldn't found any example for working with oracle. Thanks,


Solution

  • Which version of Flink are you using? Support for Oracle JDBC is available since Flink 1.15, which hasn't been released yet.