I'm trying to call an outer function through custom UDAF in PyFlink
. The function I use requires the data to be in a dictionary object. I tried to use row(t.rowtime, t.b, t.c).cast(schema)
to achieve such effect.
Outside the UDAF, this expression works well. Inside the UDAF, this expression is translated to InternalRow
which cannot be converted into a dictionary object.
Is there a way to force UDAF to use Row
instead of InternalRow
?
from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment, AggregateFunction, DataTypes
from pyflink.table.expressions import row, col, lit, row_interval
from pyflink.table.window import Tumble
from pyflink.table.udf import udaf
from datetime import datetime, date, time
class TestAccumulator(AggregateFunction):
def create_accumulator(self):
return Row(last_type="")
def accumulate(self, accumulator, value):
accumulator["last_type"] = str(type(value))
def get_value(self, accumulator):
return accumulator["last_type"]
def get_result_type(self):
return DataTypes.STRING()
def get_accumulator_type(self):
return DataTypes.ROW([
DataTypes.FIELD("last_type", DataTypes.STRING()),
])
if __name__ == "__main__":
# create a blink streaming TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
schema = DataTypes.ROW([
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("b", DataTypes.STRING()),
DataTypes.FIELD("c", DataTypes.STRING()),
])
my_udaf = udaf(TestAccumulator())
t = table_env.from_elements([(datetime(1970, 1, 1, 0, 0, 0), 'Hi', 'Hello'),
(datetime(1970, 1, 1, 1, 0, 0), 'Hi', 'hi'),
(datetime(1970, 1, 1, 2, 0, 0), 'Hi2', 'hi'),
(datetime(1970, 1, 1, 3, 0, 0), 'Hi', 'Hello'),
(datetime(1970, 1, 1, 4, 0, 0), 'Hi', 'Hello')], schema=schema)
print(
t.select( my_udaf(row(t.rowtime, t.b, t.c).cast(schema)).alias("udaf")).to_pandas().values
)
Output:
[["<class 'pyflink.fn_execution.coder_impl_fast.InternalRow'>"]]
Thanks for reporting the issue. It is a bug. I have created a JIRA https://issues.apache.org/jira/browse/FLINK-23121 to fix it. It will be fixed in the release 1.13.2