I have a simple Flink application, which sums up the events with the same id and timestamp within the last minute:
DataStream<String> input = env
.addSource(consumerProps)
.uid("app");
DataStream<Pixel> pixels = input.map(record -> mapper.readValue(record, Pixel.class));
pixels
.keyBy("id", "timestampRoundedToMinutes")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(dynamoDBSink);
env.execute(jobName);
I am trying to test this application with the recommended approach in documentation. I also have looked at this stackoverflow question, but adding the sink hadn't helped.
I do have a @ClassRule as recommended in my test class. The function looks like this:
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
CollectSink.values.clear();
Pixel testPixel1 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel2 = Pixel.builder().id(2).timestampRoundedToMinutes("202002261220").constant(1).build();
Pixel testPixel3 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel4 = Pixel.builder().id(3).timestampRoundedToMinutes("202002261220").constant(1).build();
env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
.keyBy("id","timestampRoundedToMinutes")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(new CollectSink());
JobExecutionResult result = env.execute("AggregationTest");
assertNotEquals(0, CollectSink.values.size());
CollectSink is copied from documentation.
What am I doing wrong? Is there also a simple way to test the application with embedded kafka?
Thanks!
The reason why your test is failing is because the window is never triggered. The job runs to completion before the window can reach the end of its allotted time.
The reason for this has to do with the way you are working with time. By specifying
.keyBy("id","timestampRoundedToMinutes")
you are arranging for all the events for the same id and with timestamps within the same minute to be in the same window. But because you are using processing time windowing (rather than event time windowing), your windows won't close until the time of day when the test is running crosses over the boundary from one minute to the next. With only four events to process, your job is highly unlikely to run long enough for this to happen.
What you should do instead is something more like this: set the time characteristic to event time, and provide a timestamp extractor and watermark assigner. Note that by doing this, there's no need to key by the timestamp rounded to minute boundaries -- that's part of what event time windows do anyway.
public static void main(String[] args) throws Exception {
...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(new CollectSink());
env.execute();
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(/* delay to handle out-of-orderness */);
}
@Override
public long extractTimestamp(Event event) {
return event.timestamp;
}
}
See the documentation and the tutorials for more about event time, watermarks, and windowing.