I have a param p
which is read only, like a machine learning model. And suppose I could use distributed cache to cache the file on each task manager thus each task could load it locally.
if map(new MyMapFunction(p))
, p
would be serialized and deserialized to each operator, and if cached and load, each task would load an instance of p
. Suppose I have 4 task manager, each 8 slots, we could flink run -p 32
to use all the resources and p
would have 32 instances.
Theoretically it could be done by p
only have 4 instances I suppose, and each threads could use the instance in same task manager? Could it work in Flink?
I used the following approach to initialize the per-TM common structure:
class EventProcess extends ProcessFunction[Event, Event] {
...
override def open(parameters: Configuration): Unit = {
super.open(parameters)
EventProcess.init()
}
...
}
object EventProcess {
val lock = "1"
var data: Any = _
def init(config: Config): Unit = {
lock.synchronized {
if (data == null) {
// do init
}
}
}
}
In your case, if you need to take smth from RuntimeCOntext
inside open()
and initialize your object var
with this, you can use synchronization inside open()
:
override def open(parameters: Configuration): Unit = {
super.open(parameters)
EventProcess.lock.synchronized {
if (EventProcess.YOUR_VAR == null) {
EventProcess.init(getRuntimeContext()...)
}
}
}