Search code examples
pythonapache-flinkpyflink

PyFlink UDAF InternalRow vs. Row


I'm trying to call an outer function through custom UDAF in PyFlink. The function I use requires the data to be in a dictionary object. I tried to use row(t.rowtime, t.b, t.c).cast(schema) to achieve such effect.

Outside the UDAF, this expression works well. Inside the UDAF, this expression is translated to InternalRow which cannot be converted into a dictionary object.

Is there a way to force UDAF to use Row instead of InternalRow?

from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment, AggregateFunction, DataTypes
from pyflink.table.expressions import row, col, lit, row_interval
from pyflink.table.window import Tumble
from pyflink.table.udf import udaf

from datetime import datetime, date, time

class TestAccumulator(AggregateFunction):
    def create_accumulator(self):
        return Row(last_type="")

    def accumulate(self, accumulator, value):
        accumulator["last_type"] = str(type(value))

    def get_value(self, accumulator):
        return accumulator["last_type"]

    def get_result_type(self):
        return DataTypes.STRING()

    def get_accumulator_type(self):
        return DataTypes.ROW([
            DataTypes.FIELD("last_type", DataTypes.STRING()),
        ])


if __name__ == "__main__":
    # create a blink streaming TableEnvironment
    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    table_env = TableEnvironment.create(env_settings)

    schema = DataTypes.ROW([
        DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3)),
        DataTypes.FIELD("b", DataTypes.STRING()),
        DataTypes.FIELD("c", DataTypes.STRING()),
        ])


    my_udaf = udaf(TestAccumulator())

    t = table_env.from_elements([(datetime(1970, 1, 1, 0, 0, 0), 'Hi', 'Hello'),
                                 (datetime(1970, 1, 1, 1, 0, 0), 'Hi', 'hi'),
                                 (datetime(1970, 1, 1, 2, 0, 0), 'Hi2', 'hi'),
                                 (datetime(1970, 1, 1, 3, 0, 0), 'Hi', 'Hello'),
                                 (datetime(1970, 1, 1, 4, 0, 0), 'Hi', 'Hello')], schema=schema)

    print(
        t.select( my_udaf(row(t.rowtime, t.b, t.c).cast(schema)).alias("udaf")).to_pandas().values
    )


Output:

[["<class 'pyflink.fn_execution.coder_impl_fast.InternalRow'>"]]

Solution

  • Thanks for reporting the issue. It is a bug. I have created a JIRA https://issues.apache.org/jira/browse/FLINK-23121 to fix it. It will be fixed in the release 1.13.2