Search code examples
apache-kafkaapache-flinkflink-streaming

Flink processWindow function emits records with partial information


We are seeing some weird behaviour with a processWindow function emitting two records, the first record contains complete information using aggregated data present in the window and the second record contains partial information with some information removed from the record.

The processWindow function is using state(MapState) as follows:

override def open(parameters: Configuration): Unit = {
    cfState = getRuntimeContext.getMapState(
      new MapStateDescriptor[(String, Int), mutable.Map[Int, mutable.Set[Int]]] (
        "customFieldsState",
        classOf[(String, Int)],
        classOf[mutable.Map[Int, mutable.Set[Int]]]
      )
    )
  }

and the process function manipulates the above state using records present in the window.

Is this an anti-pattern? Using state within a processWindow function? Are there any other recommendations to using state within a processWindow function?

We need to maintain state in this case as we don't capture all fields in a single window and we need to aggregate the records per user, hence the use of a window function.

Thanks


Solution

  • If you want to maintain state beyond the lifetime of a single window instance, you should use

    KeyedStateStore ProcessWindowFunction.Context#globalState
    

    All other state is cleared when the window is closed.

    Since globalState is never cleared by Flink, you should set state TTL on the state descriptor you use if you will have keys that go stale, in order to avoid leaking state over time.