Search code examples
javaapache-flinkjedis

Flink: java.io.NotSerializableException: redis.clients.jedis.JedisCluster


When I submit new flink job, it throws

Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
    ... 24 more

this is my code:

    JedisCluster jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);


    DataStream<MobileClickEvent> clickEventDataStream = environment.addSource(clickConsumer);


    clickEventDataStream
            .filter(Objects::nonNull)
            .keyBy(new KeySelector<MobileClickEvent, String>() {
                @Override
                public String getKey(MobileClickEvent value) throws Exception {
                    return value.getItemId() + "_" + value.getItemType();
                }
            })
            .process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
                @Override
                public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
                    String key = ctx.getCurrentKey();
                    jedisCluster.hincrBy("{item_feature}" + key, "click", 1);
                    jedisCluster.expire("{item_feature}" + key, 60 * 10);
                }
            });

Solution

  • In OP's answer, jedisCluster will be initialized for each element.

    Consider overriding open as well, and initialize there.

    Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work.

    .process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
        private JedisCluster jedisCluster;
    
        @Override
        public void open(Configuration parameters) {
            jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
        }
    
        @Override
        public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
            String key = ctx.getCurrentKey();
            jedisCluster.hincrBy(REDIS_PREFIX + key, "click", 1);
            jedisCluster.expire(REDIS_PREFIX + key, 60 * 10);
        }
    });