I want to use a non serializable object in stream.map()
like this
stream.map { i =>
val obj = new SomeUnserializableClass()
obj.doSomething(i)
}
It is very inefficient, because I create many SomeUnserializableClass
instance. Actually, it can be created only once in each worker.
In Spark, I can use mapPartition
to do this. But in flink stream api, I don't known.
If you are dealing with a non serializable class what I recommend you is to create a RichFunction. In your case a RichMapFunction.
A Rich operator in Flink has a open
method that is executed in the taskmanager just one time as initializer.
So the trick is to make your field transient and instantiate it in your open method.
Check below example:
public class NonSerializableFieldMapFunction extends RichMapFunction {
transient SomeUnserializableClass someUnserializableClass;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.someUnserializableClass = new SomeUnserializableClass();
}
@Override
public Object map(Object o) throws Exception {
return someUnserializableClass.doSomething(o);
}
}
Then your code will looks like:
stream.map(new NonSerializableFieldMapFunction())
P.D: I'm using java syntax, please adapt it to scala.