Search code examples
scalaapache-flinkflink-streaming

Flink: Access Key in RichMapFunction applied on KeyedStream to handle stream keyed by Option


I would like to handle None as a key case when I apply a RichMapFunction to a keyed stream.

For example I have a case class like this:

case class Foo(a: Option[String], b: Int, acc: Option[Int] = None)

acc is the field I would like to compute with my map.

I would like to apply a stateful map on a stream, so I have a RichMapFunction (for example it's an accumulator):

class Accumulator extends RichMapFunction[Foo, Foo] {

  private var sum: ValueState[Int] = _

  override def map(input: Foo): Foo = {

    val newAcc = Option(sum.value()) match {
      case None => input.b
      case Some(x) => x + input.b
    }
    sum.update(newAcc)
    Foo(input.a, input.b, Some(newAcc))
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[Int]("accumulator", createTypeInformation[Int])
    )
  }
}

Then, my pipeline is executed with:

object ExampleAccumulator extends App {

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromElements(Foo(Some("a"), 1, None), Foo(Some("a"), 2, None), Foo(None, 10, None), Foo(None, 6, None))
    .keyBy(_.a)
    .map(new Accumulator())
    .print()

  env.execute("ExampleAccumulator")
}

The ouput is:

Foo(Some(a),1,Some(1))
Foo(Some(a),2,Some(3))
Foo(None,10,Some(10))
Foo(None,6,Some(16))

But I would like to get None in acc when key is None.

Is it possible to get the key in RichMapFunction ?


Solution

  • It's not supported for now. The key can be aquired by getCurrentKey() method in KeyContext class, which is not exposed in RichMapFunction. However, Flink internally provides the KeyedProcessFunction that can return key in the parameter Context. I believe this is what you want.