When unit testing Apache Flink with ProcessFunctionTestHarnesses
KeyedOneInputStreamOperatorTestHarness<String, Event, Alert> testHarness = ProcessFunctionTestHarnesses.forKeyedProcessFunction
The testHarness
has access to 2 classes to get the number of active timers in the keyed process function.
int i = testHarness.getProcessingTimeService().getNumActiveTimers()
int k = testHarness.numProcessingTimeTimers()
I've looked at the java doc of the both TestProcessingTimeService and InternalTimeServiceManager but the description doesn't say much.
What is the difference b/w two and which should be used?
In my KeyedProcessFunction, I set and then delete a processing timer. At the end my UT, I expect there to be 0 active processing timers. From the above snippet, k
passes the assertion of being equal to 0
but i
fails, with it's actual value being 1
.
I think the key difference here comes in terms of the actual services that are backing each of these calls with one using an actual test-specific instance and the other using the internal timer service. Both can likely accomplish the same thing, but implementations may vary.
testHarness.getProcessingTimeService().getNumActiveTimers()
This uses an instance of the TestProcessingTimeService
that you mentioned earlier and simply iterates through a queue to get the number of entries in the queue that aren't "done". Which we can see per the source:
@VisibleForTesting
public int numProcessingTimeTimers() {
if (timeServiceManager != null) {
return timeServiceManager.numProcessingTimeTimers();
} else {
throw new UnsupportedOperationException();
}
}
testHarness.numProcessingTimeTimers()
This uses the actual instance of the InternalTimeServiceManager<K>
as opposed to a test-specific timer object per the source:
@VisibleForTesting
public int numProcessingTimeTimers() {
int count = 0;
for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
count += timerService.numProcessingTimeTimers();
}
return count;
}
In my experience, when writing tests surrounding time/timers, it may be necessary that after deleting/removing a timer to ensure it is done so properly and that time may need to advance after the deletion to ensure the expected changes are propagated to the service (e.g. advance time after completing your timer to ensure the underlying service recognizes that it is not longer "active").
It's also completely possible that this could be a bug with the implementation of the TestProcessingTimeService
that is resulting it in not immediately recognizing the deletion of the timer prior to your assertion.