Search code examples
apache-flinkflink-streaming

How might I implement a map of maps in Flink keyed state that supports fast insert, lookup and iteration of nested maps?


I'd like to write a Flink streaming operator that maintains say 1500-2000 maps per key, with each map containing perhaps 100,000s of elements of ~100B. Most records will trigger inserts and reads, but I’d also like to support occasional fast iteration of entire nested maps.

I've written a KeyedProcessFunction that creates 1500 RocksDb-backed MapStates per key, and tested it by generating a stream of records with a single distinct key, but I find things perform poorly. Just initialising all of them takes on the order of several minutes, and once data begin to flow async incremental checkpoints frequently fail due to timeout. Is this is a reasonable approach? If not, what alternative(s) should I consider?

Thanks!

Functionally my code is along the lines of:

val stream = env.fromCollection(new Iterator[(Int, String)] with Serializable {
  override def hasNext: Boolean = true

  override def next(): (Int, String) = {
    (1, randomString())
  }
})

stream
  .keyBy(_._1)
  .process(new KPF())
  .writeUsingOutputFormat(...)

class KFP extends KeyedProcessFunction[Int, (Int, String), String] {

  var states: Array[MapState[Int, String]] = _

  override def processElement(
    value: (Int, String),
    ctx: KeyedProcessFunction[Int, (Int, String), String]#Context,
    out: Collector[String]
  ): Unit = {
    if (states(0).isEmpty) {
      // insert 0-300,000 random strings <= 100B
    }

    val state = states(random.nextInt(1500))
    // Read from R random keys in state
    // Write to W random keys state
    // With probability 0.01 iterate entire contents of state
    if (random.nextInt(100) == 0) {
      state.iterator().forEachRemaining {
        // do something trivial
      }
    }
  }

  override def open(parameters: Configuration): Unit = {
    states = (0 until 1500).map { stateId =>
      getRuntimeContext.getMapState(new MapStateDescriptor[Int, String](stateId.toString, classOf[Int], classOf[String]))
    }.toArray
  }
}

Solution

  • There's nothing in what you've described that's an obvious explanation for poor performance. You are already doing the most important thing, which is to use MapState<K, V> rather than ValueState<Map<K, V>>. This way each key/value pair in the map is a separate RocksDB object, rather than the entire Map being one RocksDB object that has to go through ser/de for every access/update for any of its entries.

    To understand the performance better, the next step might be to enable the RocksDB native metrics, and study those for clues. RocksDB is quite tunable, and better performance may be achievable. E.g., you can tune for your expected mix of read and writes, and if you are trying to access keys that don't exist, then you should enable bloom filters (which are turned off by default).

    The RocksDB state backend has to go through ser/de for every state access/update, which is certainly expensive. You should consider whether you can optimize the serializer; some serializers can be 2-5x faster than others. (Some benchmarks.)

    Also, you may want to investigate the new spillable heap state backend that is being developed. See https://flink-packages.org/packages/spillable-state-backend-for-flink, https://cwiki.apache.org/confluence/display/FLINK/FLIP-50%3A+Spill-able+Heap+Keyed+State+Backend, and https://issues.apache.org/jira/browse/FLINK-12692. Early benchmarking suggest this state backend is significantly faster than RocksDB, as it keeps its working state as objects on the heap, and spills cold objects to disk. (How much this would help probably depends on how often you have to iterate.)

    And if you don't need to spill to disk, the the FsStateBackend would be faster still.