I've been successfully using JsonRowSerializationSchema from the flink-json artifact to create a TableSink<Row>
and output json from SQL using ROW. It works great for emitting flat data:
INSERT INTO outputTable
SELECT
ROW(col1, col1)
FROM inputTable
>>>> OK:
{"outCol1":"dasdasdas","outCol2":"dasdasdas"}
Now, I'm trying a nested schema and it breaks apart in a weird way:
INSERT INTO outputTable
SELECT
ROW('ttt', ROW('ppp'))
FROM inputTable
>>>> OK:
{"outCol1":"ttt","outCol2":{"outCol3":"ppp"}}
INSERT INTO outputTable
SELECT
ROW('ttt', ROW(col1))
FROM inputTable
>>>> OK:
{"outCol1":"ttt","outCol2":{"outCol3":"dasdasdas"}}
INSERT INTO outputTable
SELECT
ROW(col1, ROW(col1))
FROM inputTable
>>>> KO
It is a parsing problem, but I'm baffled as to why it could happen. col1 and 'ttt' are of String type expressions, and should be substitutable; but somehow the parser is perturbed by the following ROW, as the stacktrace say:
Caused by: org.apache.calcite.sql.parser.impl.ParseException: Encountered ", ROW" at line 3, column 11.
Was expecting one of:
")" ...
"," <IDENTIFIER> ...
"," <QUOTED_IDENTIFIER> ...
"," <BACK_QUOTED_IDENTIFIER> ...
"," <BRACKET_QUOTED_IDENTIFIER> ...
"," <UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.calcite.sql.parser.impl.SqlParserImpl.generateParseException(SqlParserImpl.java:23019)
at org.apache.calcite.sql.parser.impl.SqlParserImpl.jj_consume_token(SqlParserImpl.java:22836)
at org.apache.calcite.sql.parser.impl.SqlParserImpl.ParenthesizedSimpleIdentifierList(SqlParserImpl.java:4466)
at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression3(SqlParserImpl.java:3328)
at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression2b(SqlParserImpl.java:3066)
at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression2(SqlParserImpl.java:3092)
at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression(SqlParserImpl.java:3045)
at ...
Am I missing something about the syntax? What is the parser trying to do? Should I be using ROW() in another way?
Is this a bug?
After further digging, I came to the following result: you just have to talk to ROW() nicely.
This will work:
INSERT INTO outputTable
SELECT ROW(col1, col2)
FROM (
SELECT
col1,
ROW(col1, col1) as col2
FROM inputTable
) tbl2
Note:
I have submitted a JIRA issue here:
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11399
Will update this post accordingly