I am using table API to create two streams lets call it A and B. Using executeSql I am joining the two tables. The output is in the form of TableResult. I want to send the joined result to Kafka Sink. Please find below the code.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String ddlUser = "CREATE TABLE UserTable (\n" +
"id BIGINT,\n" +
"name STRING\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'USER',\n" +
"'properties.bootstrap.servers' = 'pkc:9092',\n" +
"'properties.group.id' = 'testGroup',\n" +
"'scan.startup.mode' = 'earliest-offset',\n" +
"'format' = 'json',\n" +
"'properties.security.protocol' = 'SASL_SSL',\n" +
"'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";',\n" +
"'properties.sasl.mechanism' = 'PLAIN'\n" +
")";
tEnv.executeSql(ddlUser);
String ddlPurchase = "CREATE TABLE PurchaseTable (\n" +
"transactionId BIGINT,\n" +
"userID BIGINT,\n" +
"item STRING\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'PURCHASE',\n" +
"'properties.bootstrap.servers' = 'pkc:9092',\n" +
"'properties.group.id' = 'purchaseGroup',\n" +
"'scan.startup.mode' = 'earliest-offset',\n" +
"'format' = 'json',\n" +
"'properties.security.protocol' = 'SASL_SSL',\n" +
"'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";',\n" +
"'properties.sasl.mechanism' = 'PLAIN'\n" +
")";
tEnv.executeSql(ddlPurchase);
String useQuery = "SELECT * FROM UserTable";
String purchaseQuery = "SELECT * FROM PurchaseTable JOIN UserTable ON PurchaseTable.userID = UserTable.id";
TableResult joinedData = tEnv.executeSql(purchaseQuery);
How to send unbounded TableResult to Kafka sink?
You need to insert into a destination table that is also backed by the kafka
connector: https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/common/#emit-a-table
In the example they create a temporary table, but as you have already done, you can create a table with the Kafka connector https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/ and have the stream inserted into it (haven't tested but should be something like this):
tEnv.sqlQuery(purchaseQuery).insertInto('DestinationTable')
or
tEnv.executeSql('INSERT INTO DestinationTable SELECT * FROM PurchaseTable JOIN UserTable ON PurchaseTable.userID = UserTable.id')