I have tried to log info about states that recovered when I start a Flink app using savepoints. I see that the app was started using a savepoint correctly. But I can't log state values that were recovered.
The app calculates how many products were bought.
Can I use CheckpointedFunction in such way? I see that context.isRestored() return true, but the checkpointedState is always empty. Where do I misunderstand something?
public class ProductStatisticFunction extends RichFlatMapFunction<User, ProductStatistic> implements CheckpointedFunction {
private transient ValueState<ProductStatistic> statisticsValueState;
private transient ListState<ProductStatistic> checkpointedState;
@Override
public void flatMap(User user, Collector<ProductStatistic> out) throws Exception {
ProductStatistic currentValue = statisticsValueState.value();
if (currentValue.getAge() == 0) {
currentValue.setAge(user.getAge());
}
currentValue.setAmount(currentValue.getAmount() + 1);
statisticsValueState.update(currentValue);
out.collect(new ProductStatistic(currentValue.getId, currentValue.getAge(), currentValue.getAmount()));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<ProductStatistic> descriptor =
new ValueStateDescriptor<>("age-statistic", TypeInformation.of(new TypeHint<>() {
}),
new ProductStatistic());
statisticsValueState = getRuntimeContext().getState(descriptor);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.checkpointedState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("restored", ProductStatistic.class));
if (context.isRestored()) {
for (ProductStatistic entry : this.checkpointedState.get()) {
log.warn("...."));
}
}
}
}
I'm afraid that's not how this works, and there's no straightforward way to see the key/value pairs that are being restored to the age-statistic ValueState.
You could, however, use the State Processor API to examine the state stored in the savepoint. Or you could change ProductStatisticFunction
into a KeyedBroadcastProcessFunction
and use applyToKeyedState
to examine the live state values whenever you want to while the job is running.