Search code examples
apache-flinkflink-streamingflink-sql

Flink create table via table DSL


In order to create table, I use an SQL syntax like

    val tableEnv = StreamTableEnvironment.create(env, settings)
    tableEnv.executeSql(
      "CREATE TABLE asset (smth STRING) " +
        "WITH ('connector' = 'jdbc', " +
        "'url' = 'jdbc:mysql://host:3306/db', " +
        "'username' = 'user', " +
        "'password' = 'pass', " +
        "'table-name' = 'table')"
    ) 

Is there an option to define a table via Table API DSL?


Solution

  • You can create the very same table with the following Table API methods:

        Schema schema =
                Schema.newBuilder()
                        .column("smth", DataTypes.STRING())
                        .build();
        TableDescriptor tableDescriptor =
                TableDescriptor.forConnector("jdbc")
                        .option(JdbcConnectorOptions.URL, "jdbc:mysql://host:3306/db")
                        .option(JdbcConnectorOptions.USERNAME, "user")
                        .option(JdbcConnectorOptions.PASSWORD, "pass")
                        .option(JdbcConnectorOptions.TABLE_NAME, "table")
                        .schema(schema)
                        .build();
        tEnv.createTable("asset", tableDescriptor);
    

    To create a temporary table instead, use tEnv.createTemporaryTable.

    Check TableDescriptor and Schema for more details.