Search code examples
apache-flinkflink-streaming

Could Flink different task share same variable in the same task manager?


I have a param p which is read only, like a machine learning model. And suppose I could use distributed cache to cache the file on each task manager thus each task could load it locally.

if map(new MyMapFunction(p)), p would be serialized and deserialized to each operator, and if cached and load, each task would load an instance of p. Suppose I have 4 task manager, each 8 slots, we could flink run -p 32 to use all the resources and p would have 32 instances.

Theoretically it could be done by p only have 4 instances I suppose, and each threads could use the instance in same task manager? Could it work in Flink?


Solution

  • I used the following approach to initialize the per-TM common structure:

    class EventProcess extends ProcessFunction[Event, Event] {
    
      ...
    
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
    
        EventProcess.init()
      }
    
      ...
    }
    
    object EventProcess {
    
      val lock = "1"
      var data: Any = _
    
      def init(config: Config): Unit = {
        lock.synchronized {
          if (data == null) {
            // do init
          }
        }
      }
    }
    

    In your case, if you need to take smth from RuntimeCOntext inside open() and initialize your object var with this, you can use synchronization inside open():

    
    override def open(parameters: Configuration): Unit = {
        super.open(parameters)
    
        EventProcess.lock.synchronized {
          if (EventProcess.YOUR_VAR == null) {
            EventProcess.init(getRuntimeContext()...)
          }
        }
      }