Search code examples
apache-flinkflink-streaming

Apache Flink RocksDB state management


I'm reading 2 kafka topics in the same flink jobs.

  • Stream1: Messages comes from the first topic are saved to rocksdb, then it will union with stream2.
  • Stream2: Messages comes from the second topic are enriched with state saved by stream1, then it will union with the stream1.

Topic1 and topic 2 are different sources but basically the output is the same for two sources. I have to just enrich data come from topic2 with the data come from topic1.

Here is flow;

val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)

Here is the questions;

  1. Is that flow good?
  2. Can stream2 access the state that saved by stream1 for the same memberId?

Solution

  • Seems that You should be able to achieve exactly what You want by using KeyedCoProcessFunction. This would like that more or less:

    stream1
    .keyBy(_.memberId)
    .connect(stream2.keyBy(_.memberId))
    .process(new CustomKeyedCoProcessFunction())
    

    This way You could keep the state in single KeyedCoProcessFunction, so You could access that both for stream1 and stream2.

    So, for processElement1 You could do the same thing You are doing inside map for stream1 and in processElement2 You could to the same thing You are doing inside map for stream2.