Search code examples
garbage-collectionapache-flinkflink-streaming

Garbage Collection on Flink Applications


I have a very simple Flink application in Scala. I have 2 simple streams. I am broadcasting one of my stream to the other stream. Broadcasted stream is containing rules and just checking whether the other is stream's tuples are inside of rules or not. Everything is working fine and my code is like below.

This is an infinite running application. I wonder if there is any possibility for JVM to collect my rules object as garbage or not.

Does anyone has any idea? Many thanks in advance.

object StreamBroadcasting extends App {
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  val stream = env
    .socketTextStream("localhost", 9998)
    .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
    .keyBy(l => l)

  val ruleStream = env
    .socketTextStream("localhost", 9999)
    .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))

  val broadcastStream: DataStream[String] = ruleStream.broadcast

  stream.connect(broadcastStream)
    .flatMap(new SimpleConnect)
    .print

  class SimpleConnect extends RichCoFlatMapFunction[String, String, (String, Boolean)] {
    private var rules: Set[String] = Set.empty[String] // Can JVM collect this object after a long time?

    override def open(parameters: Configuration): Unit = {}

    override def flatMap1(value: String, out: Collector[(String, Boolean)]): Unit = {
      out.collect(value, rules.contains(value))
    }

    override def flatMap2(value: String, out: Collector[(String, Boolean)]): Unit = {
      rules = rules.+(value)
    }
  }

  env.execute("flink-broadcast-streams")
}

Solution

  • No, the Set of rules will not be garbage collected. It will stick around forever. (Of course, since you're not using Flink's broadcast state, the rules won't survive an application restart.)