When I submit new flink job, it throws
Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 24 more
this is my code:
JedisCluster jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
DataStream<MobileClickEvent> clickEventDataStream = environment.addSource(clickConsumer);
clickEventDataStream
.filter(Objects::nonNull)
.keyBy(new KeySelector<MobileClickEvent, String>() {
@Override
public String getKey(MobileClickEvent value) throws Exception {
return value.getItemId() + "_" + value.getItemType();
}
})
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy("{item_feature}" + key, "click", 1);
jedisCluster.expire("{item_feature}" + key, 60 * 10);
}
});
In OP's answer, jedisCluster
will be initialized for each element.
Consider overriding open
as well, and initialize there.
Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work.
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
private JedisCluster jedisCluster;
@Override
public void open(Configuration parameters) {
jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
}
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy(REDIS_PREFIX + key, "click", 1);
jedisCluster.expire(REDIS_PREFIX + key, 60 * 10);
}
});