Search code examples
apache-flinkflink-streamingapache-calciteflink-sql

Using ROW() for nested data structure


I've been successfully using JsonRowSerializationSchema from the flink-json artifact to create a TableSink<Row> and output json from SQL using ROW. It works great for emitting flat data:

INSERT INTO outputTable 
SELECT 
  ROW(col1, col1)
FROM inputTable
>>>> OK:
{"outCol1":"dasdasdas","outCol2":"dasdasdas"}

Now, I'm trying a nested schema and it breaks apart in a weird way:

INSERT INTO outputTable 
SELECT 
  ROW('ttt', ROW('ppp'))
FROM inputTable
>>>> OK:
{"outCol1":"ttt","outCol2":{"outCol3":"ppp"}}

INSERT INTO outputTable 
SELECT 
  ROW('ttt', ROW(col1))
FROM inputTable
>>>> OK:
{"outCol1":"ttt","outCol2":{"outCol3":"dasdasdas"}}

INSERT INTO outputTable 
SELECT 
  ROW(col1, ROW(col1))
FROM inputTable
>>>> KO

It is a parsing problem, but I'm baffled as to why it could happen. col1 and 'ttt' are of String type expressions, and should be substitutable; but somehow the parser is perturbed by the following ROW, as the stacktrace say:

Caused by: org.apache.calcite.sql.parser.impl.ParseException: Encountered ", ROW" at line 3, column 11.
Was expecting one of:
    ")" ...
    "," <IDENTIFIER> ...
    "," <QUOTED_IDENTIFIER> ...
    "," <BACK_QUOTED_IDENTIFIER> ...
    "," <BRACKET_QUOTED_IDENTIFIER> ...
    "," <UNICODE_QUOTED_IDENTIFIER> ...

    at org.apache.calcite.sql.parser.impl.SqlParserImpl.generateParseException(SqlParserImpl.java:23019)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.jj_consume_token(SqlParserImpl.java:22836)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.ParenthesizedSimpleIdentifierList(SqlParserImpl.java:4466)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression3(SqlParserImpl.java:3328)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression2b(SqlParserImpl.java:3066)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression2(SqlParserImpl.java:3092)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression(SqlParserImpl.java:3045)
    at ...

Am I missing something about the syntax? What is the parser trying to do? Should I be using ROW() in another way?

Is this a bug?


Solution

  • After further digging, I came to the following result: you just have to talk to ROW() nicely.

    This will work:

    INSERT INTO outputTable
    SELECT ROW(col1, col2) 
    FROM (
      SELECT 
        col1, 
        ROW(col1, col1) as col2 
      FROM inputTable
    ) tbl2
    

    Note:

    • The nesting: Maybe the SQL only allows one nesting level. But you are allowed several table expressions. My take on it is that Flink does little at the moment to transform the SQL semantics before pushing it to the execution engine. The execution plan will create a fused ROW(col1, ROW(col1, col1)) in a single unit, so this is not that impactful.
    • ROW(col1, col1): ROW(col1) in the secondary table will not work. (it would work standalone in the first table). Don't know why. But hey, do I really need that when I only have one value? I can collapse that one value. If you have some leeway in the output schema, this won't be a problem.

    I have submitted a JIRA issue here:

    https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11399

    Will update this post accordingly