Search code examples
apache-flinkflink-sqlflink-batch

Flink Sql query that creates a MAP


I have a working Spark SQL query in this format :

String flinkSqlQuery =
    "SELECT " +
    "  MAP( " +
    "    'mId', memberId, " +
    "    'rId', IFNULL(readId, ''), " +
    "    'VIEWED', 1, " +
    "    'UDF1', UDF1(param1) " +
    "  ) AS requestContext, " +
    "  CAST(1.0 AS FLOAT) AS FIXVAL, " +
    "  CAST(0.0 AS FLOAT) AS RESPONSE " +
    "FROM testingTable";

I am trying to convert it into Flink Sql to run using Flink's Table API :

String flinkSqlQuery =
    "SELECT " +
    "  MAP( " +
    "    'mId', memberId, " +
    "    'rId', COALESCE(readId, ''), " +
    "    'VIEWED', 1, " +
    "    'UDF1', UDF1(param1) " +
    "  ) AS requestContext, " +
    "  CAST(1.0 AS FLOAT) AS FIXVAL, " +
    "  CAST(0.0 AS FLOAT) AS RESPONSE " +
    "FROM testingTable";

But I am encountering the following error :

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL parse failed. Non-query expression encountered in illegal context

Please suggest if this is this the right way to create a MAP in Flink Sql.


Solution

  • The syntax used by Flink SQL for creating a map (or an array) involves square brackets rather than parentheses. So it should be like this, instead:

        String flinkSqlQuery =
            "SELECT " +
            "  MAP[ " +
            "    'mId', memberId, " +
            "    'rId', IFNULL(readId, ''), " +
            "    'VIEWED', 1, " +
            "    'UDF1', UDF1(param1) " +
            "  ] AS requestContext, " +
            "  CAST(1.0 AS FLOAT) AS FIXVAL, " +
            "  CAST(0.0 AS FLOAT) AS RESPONSE " +
            "FROM testingTable";
    

    See the documentation for more info.