Search code examples
apache-flinkflink-streamingflink-sql

Using CTE in apache flink sql


I am trying to write a sql that use CTE in flink.

I have a table defined

CREATE TABLE test_cte
    (
        pod                     VARCHAR,
        PRIMARY KEY (pod) NOT ENFORCED
    ) WITH (
          'connector' = 'upsert-kafka',
          'topic' = 'test_cte',
          'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
          'properties.group.id' = 'test_cte_group_id',
          'value.format' = 'json',
          'key.format' = 'json',
          'properties.allow.auto.create.topics' = 'true',
          'properties.replication.factor' = '3',
          'value.json.timestamp-format.standard' = 'ISO-8601',
          'sink.parallelism' = '3'
          );

then I have insert as

WITH q1 AS ( SELECT pod FROM source ) 
 FROM q1
INSERT OVERWRITE TABLE test_cte
SELECT pod;

I get an error saying org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'FROM' at line 2, column 2.

source tables has the column pod.

When I run just the select like here

WITH q1 AS ( SELECT pod FROM roles_deleted_raw_v1)
select * from q1;

its can see the see the result


Solution

  • CTE is only available in Flink when using Hive dialect:

    SET table.sql-dialect = hive;

    However, this feature is only supported by the HiveCatalog catalog, so it is not possible to use with upsert-kafka.

    For more info on this, you can check out the Flink docs.