Search code examples
apache-flinkflink-streaming

Flink windowAll aggregate than window process?


We are aggregating some data for 1 minute which we then flush onto a file. The data itself is like a map where key is an object and value is also an object.

Since we need to flush the data together hence we are not doing any keyBy and hence are using windowAll.

The problem that we are facing is that we get better throughput if we use window function with ProcessAllWindowFunction and then aggregate in the process call vs when we use aggregate with window function. We are also seeing timeouts in state checkpointing when we use aggregate.

I tried to go through the code base and the only hypothesis I could think of is probably it is easier to checkpoint ListState that process will use vs the AggregateState that aggregate will use.

Is the hypothesis correct? Are we doing something wrong? If not, is there a way to improve the performance on aggregate?


Solution

  • Based on what you've said, I'm going to jump to some conclusions.

    I assume you are using the RocksDB state backend, and are aggregating each incoming event into into some sort of collection. In that case, the RocksDB state backend is having to deserialize that collection, add the new event to it, and then re-serialize it -- for every event. This is very expensive.

    When you use a ProcessAllWindowFunction, each incoming event is appended to a ListState object, which has a very efficient implementation -- the serialized bytes for the new event are simply appended (the list doesn't have to be deserialized and re-serialized).

    Checkpoints are timing out because the throughput is so poor.

    Switching to the FsStateBackend would help. Or use a ProcessAllWindowFunction. Or implement your own windowing with a KeyedProcessFunction, and then use ListState or MapState for the aggregation.