Search code examples
apache-flinkflink-sqlpyflink

How to use a field in ROW type column in Flink SQL?


I'm executing a SQL in Flink looks like this:

    create table team_config_source (
      `payload` ROW(
        `before` ROW(
          team_config_id int,
          ...
        ),
        `after` ROW(
          team_config_id int,
          ...
        )
      ),
      PRIMARY KEY (`payload`.`after`.`team_config_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'xxx',
    'properties.bootstrap.servers' = 'xxx',
    'properties.group.id' = 'xxx',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'key.format' = 'json'
    )

But Flink give me this error:

org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 51, column 29.
Was expecting one of:
     ")" ...
     "," ...

I've also tried to replace (`payload`.`after`.`team_config_id`) with (`payload.after.team_config_id`), but Flink will say that column payload.after.team_config_id was not defined.

How should I correct my DDL?


Solution

  • I skip this problem by dropping the PRIMARY KEY sentence.