I'm using PyFlink 1.13 for a project and I'm trying to do the following:
Here's a visual representation of the data flow I'm trying to achieve:
I'm using PyFlink's Table API and both of my tables were declared using the SQL DDL.
My query execution looks like this:
SELECT UserId, Timestamp, my_udf(Data) AS Result,
FROM InputTable
GROUP BY TUMBLE(Timestamp, interval 2 SECONDS), UserId, Data
Here's my Python UDF function:
@udf(input_types=SOME_INPUT_TYPE, result_type=SOME_OUTPUT_TYPE)
def my_udf(window_data):
# ...business logic here with window_data
return some_result
My current problem is that for some reason the my_udf
function receives each rows separately so in the example above would be called 4 times instead of 2.
I've been looking into the PyFlink docs and I'm not able to find how to achieve what I want.
The info is probably in the docs but it seems I failed to find/understand it.
Any help would be appreciated.
Thanks !
If I understand correctly what you are trying to do, you want to modify your query so that it does not group by the Data
column or the Timestamp
SELECT UserId, TUMBLE_END(Timestamp, interval '2' SECONDS), my_udf(Data) AS Result,
FROM InputTable
GROUP BY TUMBLE(Timestamp, interval '2' SECONDS), UserId
and then you want to implement a user-defined aggregate function that aggregates the values of the Data column from all the rows from the window for a given user to a single value. There's an example in the docs that I've linked to above.