We have a execute() method where we use FlinkKafkaConsumer08 as our Flink CEP source, then we have CEP pattern and alert is again going in another kafka topic. How can I write a junit test case for this execute() method? Can anyone please provide me a sample junit code for this?
Pattern.<WebConnectionUseCase>begin("start")
.where(new SimpleCondition<WebConnectionUseCase>() {
public boolean filter(WebConnectionUseCase event) {
return ((event.getValues().getPredictedAvailableMemory()
- event.getValues().getAvailableMemory()) >= STARTDIFF);
}
}).followedBy("middle").where(new IterativeCondition<WebConnectionUseCase>() {
public boolean filter(WebConnectionUseCase value, Context<WebConnectionUseCase> ctx)
throws Exception {
Iterable<WebConnectionUseCase> middleStops = ctx.getEventsForPattern("middle");
List<Double> diffMemoryList = new ArrayList<Double>();
List<Double> connectionList = new ArrayList<Double>();
middleStops.forEach(item -> diffMemoryList.add(item.getValues().getPredictedAvailableMemory()
- item.getValues().getAvailableMemory()));
middleStops.forEach(item -> connectionList.add(item.getValues().getConnection()));
return checkIncreasingPattern(diffMemoryList) && checkDecreasingPattern(connectionList);
}
private boolean checkDecreasingPattern(List<Double> list) {
//code
}
private boolean checkIncreasingPattern(List<Double> list) {
// code
}
}).times(PATTERNCOUNT).consecutive().next("end").where(new SimpleCondition<WebConnectionUseCase>() {
@Override
public boolean filter(WebConnectionUseCase event) {
return ((event.getValues().getPredictedAvailableMemory()
- event.getValues().getAvailableMemory()) >= ENDDIFF);
}
}).within(Time.minutes(TIMEOUTDURATION));
I would encapsulate the part you want to test in an object that you can connect to special sources and sinks for testing, and to the live data sources/sinks for production.
For the test sink, you could use this:
public static class TestSink<OUT> implements SinkFunction<OUT> {
// must be static
public static final List values = new ArrayList<>();
@Override
public void invoke(OUT value, Context context) throws Exception {
values.add(value);
}
}
Then your tests can compare sink.values to the expected results.
It's easier to write tests that do event time processing (compared to those that use processing time), since with processing time the results are not deterministic. And it's easier to have tests that run with a parallelism of 1, also for the sake of having deterministic results.
You will find some examples of tests here.