Search code examples
scalaapache-flinkbroadcastflink-streaming

Flink's broadcast state behavior


I am trying to play with flink's broacast state with a simple case.

I juste want to multiply an integer stream by another integer into a broadcast stream.

The behavior of my Broadcast is "weird", if I put too few elements in my input stream (like 10), nothing happen and my MapState is empty, but if I put more elements (like 100) I have the behavior I want (multiply the integer stream by 2 here).

Why broadcast stream is not taking into account if I gave too few elements ?

How can I control when the broadcast stream is working ?

Optional: I want to keep only the last element of my broadcast stream, is .clear() the good way ?

Thank you!

Here's my BroadcastProcessFunction:

import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._

class BroadcastProcess extends BroadcastProcessFunction[Int, Int, Int] {
  override def processElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, out: Collector[Int]) = {
    val currentBroadcastState = ctx.getBroadcastState(State.mapState).immutableEntries()
    if (currentBroadcastState.isEmpty) {
      out.collect(value)
    } else {
      out.collect(currentBroadcastState.last.getValue * value)
    }
  }

  override def processBroadcastElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#Context, out: Collector[Int]) = {
    // Keep only last state
    ctx.getBroadcastState(State.mapState).clear()
    // Add state
    ctx.getBroadcastState(State.mapState).put("key", value)
  }
}

And my MapState:

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.scala._

object State {
  val mapState: MapStateDescriptor[String, Int] =
    new MapStateDescriptor(
      "State",
      createTypeInformation[String],
      createTypeInformation[Int]
    )
}

And my Main:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object Broadcast {
  def main(args: Array[String]): Unit = {
    val numberElements = 100
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val broadcastStream = env.fromElements(2).broadcast(State.mapState)
    val input = (1 to numberElements).toList
    val inputStream = env.fromCollection(input)
    val outputStream = inputStream
      .connect(broadcastStream)
      .process(new BroadcastProcess())
    outputStream.print()
    env.execute()
  }
}

Edit: I use Flink 1.5, and Broadcast State documentation is here.


Solution

  • Flink does not synchronize the ingestion of streams, i.e., streams produce data as soon as they can. This is true for regular and broadcast inputs. The BroadcastProcess will not wait for the first broadcast input to arrive before ingesting the regular input.

    When you put more numbers into the regular input, it just takes more time to serialize, deserialize, and serve the input such that the broadcast input is already present, when the first regular number arrives.