I would like to handle None
as a key case when I apply a RichMapFunction
to a keyed stream.
For example I have a case class like this:
case class Foo(a: Option[String], b: Int, acc: Option[Int] = None)
acc
is the field I would like to compute with my map
.
I would like to apply a stateful map on a stream, so I have a RichMapFunction
(for example it's an accumulator):
class Accumulator extends RichMapFunction[Foo, Foo] {
private var sum: ValueState[Int] = _
override def map(input: Foo): Foo = {
val newAcc = Option(sum.value()) match {
case None => input.b
case Some(x) => x + input.b
}
sum.update(newAcc)
Foo(input.a, input.b, Some(newAcc))
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[Int]("accumulator", createTypeInformation[Int])
)
}
}
Then, my pipeline is executed with:
object ExampleAccumulator extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(Foo(Some("a"), 1, None), Foo(Some("a"), 2, None), Foo(None, 10, None), Foo(None, 6, None))
.keyBy(_.a)
.map(new Accumulator())
.print()
env.execute("ExampleAccumulator")
}
The ouput is:
Foo(Some(a),1,Some(1))
Foo(Some(a),2,Some(3))
Foo(None,10,Some(10))
Foo(None,6,Some(16))
But I would like to get None
in acc
when key is None
.
Is it possible to get the key in RichMapFunction ?
It's not supported for now. The key can be aquired by getCurrentKey() method in KeyContext class, which is not exposed in RichMapFunction. However, Flink internally provides the KeyedProcessFunction that can return key in the parameter Context. I believe this is what you want.