I am using Flink's KeyedOneInputStreamOperatorTestHarness and calling processElement twice. processElement will update the state to count the number of elements seen. In this situation, after calling processElement twice, I would expect the state to be 2. The first processElement sets the state to 1 but when I call the second processElement, I see that the state is reset to null. Do you know why this is happening and how I can retain the state in between calls to processElement?
Since your example indicates that the state is seemingly being reset after the second operation, this makes me think that your elements have different keys and are each being assigned new state/contexts.
When using a KeyedOneInputStreamOperatorTestHarness
, you'll need to ensure that both of the elements that you are passing to the operator share the same key (as this is what enforces that both are evaluated by the same instance and can access the same state).
Consider the following example function that will simply count the number of records that flow through an operator and output it:
private class StatefulLengthCountFunction: KeyedProcessFunction<Int, String, Int>(){
private lateinit var count: ValueState<Int>
override fun open(parameters: Configuration) {
count = runtimeContext.getState(ValueStateDescriptor("count", Int::class.java))
}
override fun processElement(value: String, ctx: Context, out: Collector<Int>) {
var currentCount = count.value() ?: 0
count.update(++currentCount)
out.collect(currentCount)
}
}
And we'll use the following test to demonstrate that:
@Test
fun `test demonstrating stateful counting based on string size`(){
// Arrange
val harness = ProcessFunctionTestHarnesses.forKeyedProcessFunction(
StatefulLengthCountFunction(),
// Note the size of the string will dictate the key
{ value -> value.length },
TypeInformation.of(Int::class.java)
)
// Act (since all three elements share the same size, they'll go
// through the same operator and will share state
harness.processElement(StreamRecord("abc", 0))
harness.processElement(StreamRecord("def", 0))
harness.processElement(StreamRecord("ghi", 0))
// Assert
val output = harness.extractOutputValues().last()
assert(output == 3)
}
So if you expect the state to be reused by subsequent records, you'll want to ensure that those values share the same key. In the above example this was done by defining the KeySelector
to just use the length of the string in the TestHarness:
{ value -> value.length }
However, your milage/use-case may vary depending on the shape of your elements.