Search code examples
scalastateapache-flinkflink-streaming

Flink state empty (reinitialized) after rerun


I'm trying to connect two streams, first is persisting in MapValueState: RocksDB save data in checkpoint folder, but after new run, state is empty. I run it locally and in flink cluster with cancel submiting in cluster and simply rerun locally

 env.setStateBackend(new RocksDBStateBackend(..)
 env.enableCheckpointing(1000)
 ...

   val productDescriptionStream: KeyedStream[ProductDescription, String] = env.addSource(..)
  .keyBy(_.id)

 val productStockStream: KeyedStream[ProductStock, String] = env.addSource(..)
    .keyBy(_.id)

and

  productDescriptionStream
  .connect(productStockStream)
  .process(ProductProcessor())
  .setParallelism(1)

env.execute("Product aggregator")

ProductProcessor

case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product]{
private[this] lazy val stateDescriptor: MapStateDescriptor[String, ProductDescription] =
new MapStateDescriptor[String, ProductDescription](
  "productDescription",
  createTypeInformation[String],
  createTypeInformation[ProductDescription]
)
private[this] lazy val states: MapState[String, ProductDescription] = getRuntimeContext.getMapState(stateDescriptor)

override def processElement1(value: ProductDescription,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context,out: Collector[Product]
 ): Unit = {
  states.put(value.id, value)
 }}

 override def processElement2(value: ProductStock,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]
 ): Unit = {
  if (states.contains(value.id)) {
         val product =Product(
          id = value.id,
          description = Some(states.get(value.id).description),
          stock = Some(value.stock),
          updatedAt = value.updatedAt)
        out.collect(product )
 }}

Solution

  • Checkpoints are created by Flink for recovering from failures, not for resuming after a manual shutdown. When a job is canceled, the default behavior is for Flink to delete the checkpoints. Since the job can no longer fail, it won't need to recover.

    You have several options:

    (1) Configure your checkpointing to retain checkpoints when a job is cancelled:

    CheckpointConfig config = env.getCheckpointConfig();
    config.enableExternalizedCheckpoints(
      CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    Then when you restart the job you'll need to indicate that you want it restarted from a specific checkpoint:

    flink run -s <checkpoint-path> ...
    

    Otherwise, whenever you start a job it will begin with an empty state backend.

    (2) Instead of canceling the job, use stop with savepoint:

    flink stop [-p targetDirectory] [-d] <jobID>
    

    after which you'll again need to use flink run -s ... to resume from the savepoint.

    Stop with a savepoint is a cleaner approach than relying on there being a recent checkpoint to fall back to.

    (3) Or you could use Ververica Platform Community Edition, which raises the level of abstraction to the point where you don't have to manage these details yourself.