Search code examples
apache-flinkflink-streamingrocksdb

Apache Flink checkpointing stuck


we are running a job that has a ListState of between 300GB and 400GB and sometimes the list can grow to few thousands. In our use case, every item must have its own TTL, therefore we we create a new Timer for every new item of this ListState with a RocksDB backend on S3.

This currently is about 140+ millions of timers (that will trigger at event.timestamp + 40days).

Our problem is that suddenly the checkpointing of the job gets stuck, or VERY slow (like 1% in few hours) until it eventually timeouts. It generally stops (flink dashboard shows 0/12 (0%) while the previous lines show 12/12 (100%)) on a piece of the code which is pretty simple :

[...]
    val myStream = env.addSource(someKafkaConsumer)
      .rebalance
      .map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
      .uid("src_kafka_stream")
      .name("some_name")

      myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
        .getSideOutput(outputTag)
        .keyBy(_.name)
        .addSink(sink)
[...]

Few more information :

  • AT_LEAST_ONCE checkpointing mode seems to get more easily stuck than EXACTLY_ONCE
  • Few months ago the state went up to 1.5TB of data and I think billions of timers without any issue.
  • RAM, CPU and Networking on the machines where run both taskmanagers look normal
  • state.backend.rocksdb.thread.num = 4
  • First incident happened right when we received a flood of events (about millions in minutes) but not on the previous one.
  • All of the events come from Kafka topics.
  • When in AT_LEAST_ONCE checkpointing mode, the job still runs and consumes normally.

It's the second times that it happens to us that the topology runs very fine with few millions of events per day and suddenly stops checkpointing. We have no idea what could cause this.

Anyone can think of what could suddenly cause the checkpointing to get stuck?


Solution

  • A few thoughts:

    If you have many timers all firing more-or-less simultaneously, this storm of timers will prevent anything else from happening -- the tasks will loop calling onTimer until there are no more timers to be fired, during which time their input queues will be ignored, and checkpoint barriers will not progress.

    If this is the cause of your troubles, you might add some random jitter to your timers so that event storms don't turn into timer storms later on. Reorganizing things to use State TTL might be another option.

    If you have a lot of timers on the heap, this can lead to very high GC overhead. This won't necessarily fail the job, but can make checkpointing unstable. In this case, moving the timers into RocksDB may help.

    Also: since you are using RocksDB, switching from ListState to MapState, with time as the key, would let you remove single entries without having to reserialize the entire list after each update. (With RocksDB, each key/value pair in a MapState is a separate RocksDB object.) Making the cleanup more efficient in this way might be the best remedy.