I have a message coming from Kafka into flink and I would like to create an EventTimeSessionWindows.withDynamicGap() that is adapting over time considering the density of the data. To do this I have to create an enriched message that is holding my "Event" + "the gap" that I have to calculate dynamically.
The enriched message will then be: Tuple2<Event, Long>> where Event: is a pojo that contains a CSV from kafka [tom, 53, 1.70, 18282822, ...] and Long: is the gap parameter in millis [129293838]
Currently this part of my code is:
DataStream<Tuple2<Event, Long>> enriched = stream
.keyBy((Event ride) -> ride.CorrID)
.map(new StatefulSessionCalculator());
Where StatefulSessionCalculator()
enriches the message creating the Tuple2 describe above.
After this i have to take the calculated gap out using something like this:
DataStream<Tuple2<Event, Long>> result = enriched
.keyBy((...) -> ride.CorrID)
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows())
My function DynamicSessionWindows() should do the job feeding back to flink the long but I don't understand how. This would just be a class that extends SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap from the extract() method.
I have the theory but I would need an example of how to do it.
If anyone can help me with this by putting down some code, it would be really appreciated.
Thanks
Here we go, I found how to do it. It was a simple question but beeing new to JAVA and FLINK made me struggle a bit. I have also created a KeySelector
WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
And my DynamicSessionWindows() is this one:
public class DynamicSessionWindows implements SessionWindowTimeGapExtractor<Tuple2<Event, Long>> {
@Override
public long extract(Tuple2<Event, Long> value){
return value.f1;
}
}