I have the following situation
stream<Tuple2<String, Integer>
.keyBy(0)
.timeWindow(Time.of(10, TimeUnit.SECONDS))
.sum(1)
.flatMap(..)
.sink()
What I am trying to do is calculate a top N for my time window. The top N for each window is stored by the sink.
I can calculate the top N in the flatmap, but I do not know when to send it to the sink for storage. As far as I can see there is no way to know when the window has ended from within the flatmap function.
I know there are alternatives such as an apply function which does both or creating markers in the stream to indicate the end, but I am wondering if there is a more elegant solution.
If you want to calculate the top N
for each window over all keys, then you should apply a time window all with the same length in whose apply method you calculate the top N
. You could do something like:
final int n = 10;
stream
.keyBy(0)
.timeWindow(Time.of(10L, TimeUnit.SECONDS))
.sum(1)
.timeWindowAll(Time.of(10L, TimeUnit.SECONDS))
.apply(new AllWindowFunction<Tuple2<String,Integer>, Tuple2<String, Integer>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
PriorityQueue<Tuple2<String, Integer>> priorityQueue = new PriorityQueue<>(n, new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return o1.f1 - o2.f1;
}
});
for (Tuple2<String, Integer> value : values) {
priorityQueue.offer(value);
while (priorityQueue.size() > n) {
priorityQueue.poll();
}
}
for (Tuple2<String, Integer> stringIntegerTuple2 : priorityQueue) {
out.collect(stringIntegerTuple2);
}
}
})
.print();