Search code examples
apache-flinkflink-streaming

Flink - Java class members in keyed process function


I have the following flink keyedprocessfunction. I am basically trying to implement State Design Pattern.

public AlertProcessor extends KeyedProcessFunction<Tuple2<String, String>, Event1, Event2> {

   private transient AlertState currentState;
   private transient AlertState activeAlertState;
   private transient AlertState noActiveAlertState;
   private transient AlertState resolvedAlertState;

   @Override
   public void open(Configuration parameters) {
      activeAlertState = new ActiveAlertState();
      noActiveAlertState = new NoActiveAlertState();
      resolvedAlertState = new ResolvedAlertState();
   }


   @Override
   public processElement(Event1 event1, Context ctx, Collector<Event2> out) throws Exception {

        // Would the below if condition work for multiple keys?
        if (currentAlertState == null) {
            currentAlertState = noActiveAlertState;
        }

        currentAlertState.handle(event1, out);
   }

   private interface AlertState {
        void handle(Event1 event1, Collector<Event2> out);
   } 

   private class ActiveAlertState implements AlertState {
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
           
           // Do something and push some Event2 to out
           currentAlertState = resolvedActiveAlertState;
       }
   }
 
   private class NoActiveAlertState implements AlertState {
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
            
           // Do something and push some Event2 to out
           currentAlertState = activeAlertState;
       }
   }

   private class ResolvedAlertState implements AlertState {
       
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
           
           // Do something and push some Event2 to out
           currentAlertState = noActiveAlertState;
       }
   }

}

My question is -

  1. Would there be one AlertProcessor instance (or object) per key in the stream? In other words, does currentAlertState object be unique per key? or there will be one currentAlertState per instance of this AlertProcessor operator?

If currentAlertState is per instance of the operator, then this code would not really work because currentAlertState would be overwritten for different keys. Is my understanding correct?

  1. I can store the currentAlertState in keyed state and initialize it for every processElement() call. If I do that, I don't need to assign or set currentAlertState to next state in handle() implementations because currentAlertState will be initialized anyway based on what is in flink state.

  2. Is there better way to implement state design pattern in flink and still reduce the number state objects created?


Solution

  • A single AlertProcessor instance will be created in each parallel instance of your pipeline (each task slot), and it will be multiplexed over all of the keys handled by that slot.

    If currentAlertState is per instance of the operator, then this code would not really work because currentAlertState would be overwritten for different keys. Is my understanding correct?

    Correct. You should use keyed state for currentAlertState, which will result in one entry in the state backend for each distinct key.