Search code examples
apache-kafkaapache-flinkflink-streamingflink-sql

How to send unbounded TableResult to Kafka sink?


I am using table API to create two streams lets call it A and B. Using executeSql I am joining the two tables. The output is in the form of TableResult. I want to send the joined result to Kafka Sink. Please find below the code.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String ddlUser = "CREATE TABLE UserTable (\n" +
        "id BIGINT,\n" +
        "name STRING\n" +
        ") WITH (\n" +
        "'connector' = 'kafka',\n" +
        "'topic' = 'USER',\n" +
        "'properties.bootstrap.servers' = 'pkc:9092',\n" +
        "'properties.group.id' = 'testGroup',\n" +
        "'scan.startup.mode' = 'earliest-offset',\n" +
        "'format' = 'json',\n" +
        "'properties.security.protocol' = 'SASL_SSL',\n" +
        "'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";',\n" +
        "'properties.sasl.mechanism' = 'PLAIN'\n" +
        ")";
tEnv.executeSql(ddlUser);
String ddlPurchase = "CREATE TABLE PurchaseTable (\n" +
        "transactionId BIGINT,\n" +
        "userID BIGINT,\n" +
        "item STRING\n" +
        ") WITH (\n" +
        "'connector' = 'kafka',\n" +
        "'topic' = 'PURCHASE',\n" +
        "'properties.bootstrap.servers' = 'pkc:9092',\n" +
        "'properties.group.id' = 'purchaseGroup',\n" +
        "'scan.startup.mode' = 'earliest-offset',\n" +
        "'format' = 'json',\n" +
        "'properties.security.protocol' = 'SASL_SSL',\n" +
        "'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";',\n" +
        "'properties.sasl.mechanism' = 'PLAIN'\n" +
        ")";
tEnv.executeSql(ddlPurchase);
String useQuery = "SELECT * FROM UserTable";
String purchaseQuery = "SELECT * FROM PurchaseTable JOIN UserTable ON PurchaseTable.userID = UserTable.id";
TableResult joinedData = tEnv.executeSql(purchaseQuery);

How to send unbounded TableResult to Kafka sink?


Solution

  • You need to insert into a destination table that is also backed by the kafka connector: https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/common/#emit-a-table

    In the example they create a temporary table, but as you have already done, you can create a table with the Kafka connector https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/ and have the stream inserted into it (haven't tested but should be something like this):

    tEnv.sqlQuery(purchaseQuery).insertInto('DestinationTable')
    

    or

    tEnv.executeSql('INSERT INTO DestinationTable SELECT * FROM PurchaseTable JOIN UserTable ON PurchaseTable.userID = UserTable.id')