I'm reading 2 kafka topics in the same flink jobs.
Stream1
: Messages comes from the first topic are saved to rocksdb, then it will union with stream2.Stream2
: Messages comes from the second topic are enriched with state saved by stream1, then it will union with the stream1.Topic1 and topic 2 are different sources but basically the output is the same for two sources. I have to just enrich data come from topic2 with the data come from topic1.
Here is flow;
val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)
Here is the questions;
stream2
access the state that saved by stream1
for the same memberId
?Seems that You should be able to achieve exactly what You want by using KeyedCoProcessFunction
. This would like that more or less:
stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())
This way You could keep the state in single KeyedCoProcessFunction
, so You could access that both for stream1
and stream2
.
So, for processElement1
You could do the same thing You are doing inside map
for stream1
and in processElement2
You could to the same thing You are doing inside map
for stream2.