Search code examples
apache-flinkflink-streaming

Does Flink DataStream have api like mapPartition?


I want to use a non serializable object in stream.map() like this

stream.map { i =>
  val obj = new SomeUnserializableClass()
  obj.doSomething(i)
}

It is very inefficient, because I create many SomeUnserializableClass instance. Actually, it can be created only once in each worker.

In Spark, I can use mapPartition to do this. But in flink stream api, I don't known.


Solution

  • If you are dealing with a non serializable class what I recommend you is to create a RichFunction. In your case a RichMapFunction.

    A Rich operator in Flink has a open method that is executed in the taskmanager just one time as initializer.

    So the trick is to make your field transient and instantiate it in your open method.

    Check below example:

    public class NonSerializableFieldMapFunction extends RichMapFunction {
    
        transient SomeUnserializableClass someUnserializableClass;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.someUnserializableClass = new SomeUnserializableClass();
        }
    
        @Override
        public Object map(Object o) throws Exception {
            return someUnserializableClass.doSomething(o);
        }
    }
    

    Then your code will looks like:

    stream.map(new NonSerializableFieldMapFunction())
    

    P.D: I'm using java syntax, please adapt it to scala.