Search code examples
scalastreamingapache-flink

Update concurrent map inside a stream map on flink


I have one stream that constantly streaming the latest values of some keys.

Stream A:DataStream[(String,Double)]

I have another stream that wants to get the latest value on each process call.

My approach was to introduce a concurrentHashMap which will be updated by stream A and read by the second stream.

val rates = new concurrentHasMap[String,Double].asScala
val streamA : DataStream[(String,Double)]= ???
streamA.map(keyWithValue => rates(keyWithValue._1)= keyWithValue._2) //rates never gets updated
rates("testKey")=2 //this works
val streamB: DataStream[String] = ???
streamB.map(str=> rates(str)  // rates does not contain the values of the streamA at this point
  //some other functionality
) 

Is it possible to update a concurrent map from a stream? Any other solution on sharing data from a stream with another is also acceptable


Solution

  • The behaviour You are trying to use will not work in a distributed manner, basically if You will have parellelism > 1 it will not work. In Your code rates are actually updated, but in different instance of parallel operator.

    Actually, what You would like to do in this case is use a BroadcastState which was designed to solve exactly the issue You are facing.

    In Your specific usecase it would look like something like this:

    val streamA : DataStream[(String,Double)]= ???
    val streamABroadcasted = streamA.broadcast(<Your Map State Definition>)
    val streamB: DataStream[String] = ???
    streamB.connect(streamABroadcasted)
    

    Then You could easily use BroadcastProcessFunction to implement Your logic. More on the Broadcast state pattern can be found here