Search code examples
apache-flinkflink-streaming

Apache flink accessing keyed state from late window


I am writing a Flink application which consumes time series data from kafka topic. Time series data has components like metric name, tag key value pair, timestamp and a value. I have created a tumbling window to aggregate data based on a metric key (which is a combination of metric name, key value pair and timestamp). Here is the main stream looks like

kafka source -> Flat Map which parses and emits Metric ->  Key by metric
key  -> Tumbling window of 60 seconds -> Aggregate the data -> write to the
sink.

I also want to check if there is any metric which arrived late outside the above window. I want to check how many metrics arrived late and calculate the percentage of late metrics compared to original metrics. I am thinking of using the "allowedLateness" feature of flink to send the late metrics to a different stream. I am planning to add a "MapState" in the main "Aggregate the data" operator which will have the key as the metric key and value as the count of the metrics that arrived in the main window.

kafka source -> Flat Map which parses and emits Metric -> Key by metric key
->  Tumbling window of 60 seconds -> Aggregate the data (Maintain a map
state of metric count) -> write to the sink.

                                                   \

                                                    \

                                                  Late data -> Key by
 metric key ->  Collect late metrics and find the percentage of late metrics
 -> Write the result in sink

My question is can "Collect late metrics and find the percentage of late metrics" operator access the "MapState" which got updated by the mainstream. Even though they are keyed by the same metric key, I guess they are two different tasks. I want to calculate (number of late metrics / (number of late metrics + number of metrics arrived on time)).


Solution

  • There are several different ways you could approach this.

    You could store the per-window state in the KeyedStateStore windowState() provided by the Context passed to your WindowProcessFunction. Used in combination with allowedLateness, you could compute the late event statistics as late firings occur. (No need for MapState with this approach, since the windowState is already scoped to a specific window and specific key. ValueState will suffice.)

    Another idea would be to capture a side output stream of the late events from the primary window and send those late events through another window that counts them over some time frame. Then send both that late event analytics stream and the output of the first (main) window into a KeyedCoProcessFunction (or RichCoFlatMap) that can compute the late event vs on-time event statistics. (Here you will need MapState, since you may need to have several windows open simultaneously for each key of the keyed stream.)

    Or you could use a simple process function to split the initial stream into two (by comparing the timestamps to the current watermark) -- one for the late and another for the not-late events -- and then use Flink SQL to compute all of the statistics.

    Or just implement the whole thing in one KeyedProcessFunction. See https://ci.apache.org/projects/flink/flink-docs-stable/docs/learn-flink/event_driven/ for an example.