My flink aplication throws such exception when it starts:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: org.apache.ibatis.binding.MapperProxy@3fe8ad3f is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:171)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:91)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1606)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:178)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1240)
at com.pdd.service.koduck.realtime.flink.operators.baseLayer.PlanProcessor.afterProcess(PlanProcessor.java:90)
at com.pdd.service.koduck.realtime.flink.operators.baseLayer.Processor.process(Processor.java:43)
at com.pdd.service.koduck.realtime.flink.Runner.run(Runner.java:49)
at com.pdd.service.koduck.realtime.flink.Main.main(Main.java:27)
Caused by: java.io.NotSerializableException: org.mybatis.spring.SqlSessionTemplate
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:153)
... 11 more
I think the reason is that one of my SinkFunction reference a Mybatis Mapper object which reference a non-serializable SqlSessionTemplate.
This is my sink Function:
public class MySinkFunction2<T> extends RichSinkFunction<List<PlanDailyTable.Row>> {
private PlanDailyDtoMapper mapper;
public MySinkFunction2(PlanDailyDtoMapper mapper) {
this.mapper = mapper;
}
@Override
public void invoke(List<PlanDailyTable.Row> value, Context context) throws Exception {
mapper.insertMultiRow(value);
}
}
How to solve this problem? Need some help
Rather than instantiating the Mapper object in the constructor, you can do this in the sink's open
method, and then make the Mapper transient
.
The sink's constructor is called on the Flink client, and the sink has to be serialized and sent to the task managers. Whereas the sink's open method is called once in each task manager as the job begins.