Search code examples
apache-flink

Testing flink streams with windowAll


Ι have been trying to test windowAll operation in apache flink with window type TumblingProcessingTimeWindows but when I try to retrieve the results from sink, they are always 0. Using TumblingProcessingTimeWindows is mandatory and part of the spec, so I cannot use a window triggered by events. Specifically I have the following flow:

public void executeStream() {

  var stream = env.addSource(convertToSourceFunction)
              .returns(new TypeHint<SomeObject>() {})
              .windowAll(some window of type TumblingProcessingTimeWindows)
              .process(a class of type ProcessAllWindowFunction<SomeObject, ListOfObjects, TimeWindow> );
              .addSink(myKafkaSink);
      env.execute();
}

When trying to test the code above I create the following resources:

    private FromElementsFunction<SomeObject> convertToSourceFunction(SomeObject element) {
        def hint = new TypeHint<SomeObject>(){}
        return new FromElementsFunction<>(hint.typeInfo.createSerializer(env.getConfig()), element)
    }
class MyKafkaSink implements SinkFunction<ListOfObjects> {
    static def values = []

    @Override
    void invoke(ListOfObjects value, Context context) throws Exception {
        values << value
    }
}

When trying to compare and validate that I have exactly one element in the sink, I get false in the following check:

MyKafkaSink.values.size == 1 //real value is 0

Of course if I don't use window functions, everything works as expected. What is the proper way to test a case like this? I am using Spock to test Java code.


Solution

  • For testing something like this, you can (a) use a very short window size, and then (b) use a custom source that will first send out the N test events, and then delay for some amount of time greater than your window size.

    This is still not completely deterministic, but it's generally pretty reliable. If you wanted to harden it, you could use an atomic counter that your sink would increment when it received an event, and in your source you'd delay until that counter had reached the target value. You'd still need to run your workflow async, and have a timeout (error case) if it didn't complete soon enough.