Search code examples
apache-flinkrestorecheckpoint

Why flink can't restore from a savepoint


version flink 1.7

im trying to make a flink job restore from a savepoint(or checkpoint), what the job do is reading from kafka -> do a 30-minutes-window aggregation(like a counter) -> sink to kafka.

i use rocksdb and enabled checkpoint.

now i try to trigger a savepoint manually. the expected value of each aggregated one is 30(1 data/per minute). but when i restore from a savepoint(flink run -d -s {url}), the aggregated value is not 30(less than 30, depends on the time i cancel flink job and restore). when the job run normally, it gets 30.

i don't know why could some data seems to be lost?

and a log shows "No restore state for FlinkKafkaConsumer"

main code:

        source.flatMap(new FlatMapFunction<String, Model>() {
        private static final long serialVersionUID = 5814342517597371470L;

        @Override
        public void flatMap(String value, Collector<Model> out) throws Exception {
            LOGGER.info("----> catch value: " + value);
            Model model =  JSONObject.parseObject(value, Model.class);
            out.collect(model);
        }
    }).uid("flatmap-1").name("flatmap-1").assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Model>() {

        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public long extractTimestamp(Model element, long previousElementTimestamp) {
            return element.getTime();
        }

        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(Model lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }
    }).setParallelism(1).keyBy(Model::getDim).window(new DynamicWindowAssigner()).aggregate(new AggregateFunction<Model, Model, Model>() {
        @Override
        public Model createAccumulator() {
            return new Model();
        }

        @Override
        public Model add(Model value, Model accumulator) {
            init(value, accumulator);
            accumulator.setValue(accumulator.getValue() + 1);
            return accumulator;
        }

        @Override
        public Model getResult(Model accumulator) {
            return accumulator;
        }

        @Override
        public Model merge(Model a, Model b) {
            return null;
        }

        private void init(Model value, Model accumulator){
            if(accumulator.getTime() == 0L){
                accumulator.setValue(0);
                accumulator.setDim(value.getDim());
                accumulator.setTime(value.getTime());
            }
        }
    }).uid("agg-1").name("agg-1").map(new MapFunction<Model, String>() {
        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public String map(Model value) throws Exception {
            value.setTime(TimeWindow.getWindowStartWithOffset(value.getTime(), 0, TimeUnit.MINUTES.toMillis(30)));
            return JSONObject.toJSONString(value);
        }
    }).uid("flatmap-2").name("flatmap-2").setParallelism(4).addSink(metricProducer).uid("sink").name("sink").setParallelism(2);

checkpoint settings:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointTimeout(120000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
    StateBackend stateBackend = new RocksDBStateBackend(${path}, true);
    env.setStateBackend(stateBackend);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();

Solution

  • finally it turns out i should use flink run -s {savepoint} -d xxx.jar instesad of flink run -d xxx.jar -s {savepoint}, if "-d" flag is in front of "-s" flag , then flink ignore "-s" somehow