Search code examples
pythonapache-flinkpyflink

Windowed grouping using message keys in PyFlink


I'm using PyFlink 1.13 for a project and I'm trying to do the following:

  • Read data from Kafka topic where messages contain a UserId
  • Perform tumbling windowing over 2 seconds on the data
  • Call a Python UDF with my windows values

Here's a visual representation of the data flow I'm trying to achieve: enter image description here

I'm using PyFlink's Table API and both of my tables were declared using the SQL DDL.

My query execution looks like this:

SELECT UserId, Timestamp, my_udf(Data) AS Result,
FROM InputTable 
GROUP BY TUMBLE(Timestamp, interval 2 SECONDS), UserId, Data

Here's my Python UDF function:

@udf(input_types=SOME_INPUT_TYPE, result_type=SOME_OUTPUT_TYPE)
def my_udf(window_data):
    
    # ...business logic here with window_data
    
    return some_result

My current problem is that for some reason the my_udf function receives each rows separately so in the example above would be called 4 times instead of 2.

I've been looking into the PyFlink docs and I'm not able to find how to achieve what I want.

The info is probably in the docs but it seems I failed to find/understand it.

Any help would be appreciated.

Thanks !


Solution

  • If I understand correctly what you are trying to do, you want to modify your query so that it does not group by the Data column or the Timestamp

    SELECT UserId, TUMBLE_END(Timestamp, interval '2' SECONDS), my_udf(Data) AS Result,
    FROM InputTable 
    GROUP BY TUMBLE(Timestamp, interval '2' SECONDS), UserId
    

    and then you want to implement a user-defined aggregate function that aggregates the values of the Data column from all the rows from the window for a given user to a single value. There's an example in the docs that I've linked to above.