Search code examples
apache-flinkflink-streamingflink-sqlapache-iceberg

Flink SQL OPTIONS for 'streaming'='true' doesn't work with JOIN


I'm writing a Flink job that will read from Iceberg, perform a join and then write to another Iceberg table. The SQL that I have works, but only runs once and then finishes. Reading the documentation I can configure it to run continuously, and react on new Iceberg snapshots, by adding this SQL comment: /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s', 'starting-strategy'='YOUR_STARTING_STRATEGY_HERE') */

Documentation link Documentation example:

SELECT * 
FROM hive_table 
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

Looking at Iceberg flink documentation I found this

SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

With this example in mind, I wrote the following which doesnt work:

INSERT INTO lake.gold_feedback_answers
SELECT
  a.`answer_id` AS `answer_id`,
  a.`answer_set_id` AS `answer_set_id`,
  a.`question_item_id` AS `question_item_id`,
  a.`answer` AS `answer`,
  a.`created` AS `answer_created`,
  aset.`created` AS `answerset_created`,
  aset.`respondant_id` AS `respondant_id`,
  aset.`completed` AS `answerset_completed`,
  s.`survey_id` AS `survey_id`,
  s.`customer_id` AS `customer_id`,
  s.`survey_start` AS `survey_start`,
  s.`survey_end` AS `survey_end`,
  s.`survey_language` AS `survey_language`,
  s.`archived` AS `survey_archived`
FROM
  lake.silver_filter_dbo_answers a
JOIN
  lake.silver_filter_dbo_answersets aset ON a.`answer_set_id` = aset.`answer_set_id`
JOIN
  lake.silver_filter_dbo_survey s ON aset.`survey_id` = s.`survey_id`
WHERE
  a.`op` = 'c'
  AND aset.`op` = 'c'
/*+ OPTIONS('streaming'='true', 'monitor-interval'='10s', 'starting-strategy'='INCREMENTAL_FROM_LATEST_SNAPSHOT') */;

Exception is:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "/*+" at line 26, column 1.
Was expecting one of:
    <EOF> 
    "EXCEPT" ...
    ....

However, using it in simpler cases works: Example 1:

INSERT INTO mydb.mytable
  SELECT
      JSON_VALUE(data, '$.RowID') AS row_id,
      JSON_VALUE(data, '$.SetId') AS set_id, 
      created, 
      JSON_VALUE(data, '$.Op') AS op, 
      JSON_VALUE(data, '$.Deleted') AS deleted 
    FROM mydb.sometable
    /*+  OPTIONS('streaming'='true', 'monitor-interval'='10s')  */
    WHERE JSON_VALUE(data, '$.Op') = 'c' OR JSON_VALUE(data, '$.Op') IS NULL ;

Example 2:

INSERT INTO mydb.mytable
  SELECT
      JSON_VALUE(data, '$.RowID') AS row_id,
      JSON_VALUE(data, '$.SetId') AS set_id, 
    FROM mydb.sometable
    /*+  OPTIONS('streaming'='true', 'monitor-interval'='10s')  */;

I also tried configuring this directly in the environment and running the vanilla versions of SQL. But that didn't continuously run the SQL, just once and finish.

tableEnv.getConfig().getConfiguration().setString("pipeline.monitor-interval", "10s");
tableEnv.getConfig().getConfiguration().setString("pipeline. Streaming", "true");
tableEnv.getConfig().getConfiguration().setString("pipeline.streaming.enable", "true");
tableEnv.getConfig().getConfiguration().setString("streaming-source.enable", "true");
tableEnv.getConfig().getConfiguration().setString("streaming", "true");

What am I doing wrong? I tried placing the comment at different parts of join queryies and searching the documentation, but the link and example I pasted seems to be the only reference.


Solution

  • I believe the comment needs to come immediately after the table name. An alternative is to specify the option in the CREATE TABLE DDL, as shown in the example in this section of the docs, e.g.,

    SET table.sql-dialect=hive;
    CREATE TABLE dimension_table (
      product_id STRING,
      update_user STRING,
      ...
    ) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
      'streaming-source.enable' = 'true',
       ...
    );