Search code examples
apache-flink

Flink TestHarness output doesn't clear after collect


I have the following test:

testHarness.processElement2(new StreamRecord<>(element1));
testHarness.processElement1(new StreamRecord<>(new Tuple2<>(id, element2)));

testHarness.setProcessingTime(1); //let's assume it's the correct time for the timer inside the function
softly.assertThat(testHarness.getOutput()).containsExactly(new StreamRecord<>(expectedResult)); //this one is passed

testHarness.setProcessingTime(2); // setting second timer which will trigger different timer
softly.assertThat(testHarness.getOutput()).containsExactly(new StreamRecord<>(expectedResult2)); //fails cause output has  expectedResult & expectedResult2

Why TestHarness is not clearing it's elements once we call getOutput()? Could this functionality be achieved somehow?


Solution

  • This can be achieved by calling clear() on the output :

    testHarness.processElement2(new StreamRecord<>(element1));
    testHarness.processElement1(new StreamRecord<>(new Tuple2<>(id, element2)));
    
    testHarness.setProcessingTime(1); //let's assume it's the correct time for the timer inside the function
    softly.assertThat(testHarness.getOutput()).containsExactly(new StreamRecord<>(expectedResult)); // Pass
    
    testHarness.getOutput().clear();
    
    testHarness.setProcessingTime(2); // setting second timer which will trigger different timer
    softly.assertThat(testHarness.getOutput()).containsExactly(new StreamRecord<>(expectedResult2)); // Pass