Search code examples
apache-flinkpyflink

PyFlink - Scala UDF - How to convert Scala Map in Table API?


I'm trying to map the Map[String,String] object output of my Scala UDF (scala.collection.immutable.map) to some valid data type in the Table API, namely via Java type (java.util.Map) as recommended here: Flink Table API & SQL and map types (Scala). However I get below error.

Any idea about right way to proceed ? If yes, is there a way to generalize the conversion to a (nested) Scala object of type Map[String,Any] ?

Code

Scala UDF

class dummyMap() extends ScalarFunction {
  def eval() = {
    val whatevermap = Map("key1" -> "val1", "key2" -> "val2")
    whatevermap.asInstanceOf[java.util.Map[java.lang.String,java.lang.String]]
  }
}

Sink

my_sink_ddl = f"""
    create table mySink (
        output_of_dummyMap_udf MAP<STRING,STRING>
    ) with (
        ...
    )
"""

Error

Py4JJavaError: An error occurred while calling o430.execute.
: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`mySink` do not match.
Query result schema: [output_of_my_scala_udf: GenericType<java.util.Map>]
TableSink schema:    [output_of_my_scala_udf: Map<String, String>]

Thanks !


Solution

  • Original answer from Wei Zhong. I'm just reporter. Thanks Wei !

    At this point (Flink 1.11), two methods are working:

    • Current: DataTypeHint in UDF definition + SQL for UDF registering
    • Outdated: override getResultType in UDF definition + t_env.register_java_function for UDF registering

    Code

    Scala UDF

    package com.dummy
    
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.table.annotation.DataTypeHint
    import org.apache.flink.table.api.Types
    import org.apache.flink.table.functions.ScalarFunction
    import org.apache.flink.types.Row
    
    class dummyMap extends ScalarFunction {
    
      // If the udf would be registered by the SQL statement, you need add this typehint
      @DataTypeHint("ROW<s STRING,t STRING>")
      def eval(): Row = {
    
        Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
    
      }
    
      // If the udf would be registered by the method 'register_java_function', you need override this
      // method.
      override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
        // The type of the return values should be TypeInformation
        Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
      }
    }
    

    Python code

    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import StreamTableEnvironment
    
    s_env = StreamExecutionEnvironment.get_execution_environment()
    st_env = StreamTableEnvironment.create(s_env)
    
    # load the scala udf jar file, the path should be modified to yours
    # or your can also load the jar file via other approaches
    st_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/zhongwei/the-dummy-udf.jar")
    
    # register the udf via 
    st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE SCALA")
    # or register via the method
    # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
    
    # prepare source and sink
    t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    st_env.execute_sql("""create table mySink (
            output_of_my_scala_udf ROW<s STRING,t STRING>
        ) with (
            'connector' = 'print'
        )""")
    
    # execute query
    t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()