Search code examples
apache-flinkflink-streaming

Flink: SessionWindowTimeGapExtractor - Compute the gap dynamically using data density


I have a message coming from Kafka into flink and I would like to create an EventTimeSessionWindows.withDynamicGap() that is adapting over time considering the density of the data. To do this I have to create an enriched message that is holding my "Event" + "the gap" that I have to calculate dynamically.

The enriched message will then be: Tuple2<Event, Long>> where Event: is a pojo that contains a CSV from kafka [tom, 53, 1.70, 18282822, ...] and Long: is the gap parameter in millis [129293838]

Currently this part of my code is:

 DataStream<Tuple2<Event, Long>> enriched = stream 
                    .keyBy((Event ride) -> ride.CorrID)        
                    .map(new StatefulSessionCalculator());

Where StatefulSessionCalculator() enriches the message creating the Tuple2 describe above.

After this i have to take the calculated gap out using something like this:

DataStream<Tuple2<Event, Long>> result = enriched
                 .keyBy((...) -> ride.CorrID)
                 .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows())

My function DynamicSessionWindows() should do the job feeding back to flink the long but I don't understand how. This would just be a class that extends SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap from the extract() method.

I have the theory but I would need an example of how to do it.

If anyone can help me with this by putting down some code, it would be really appreciated.

Thanks


Solution

  • Here we go, I found how to do it. It was a simple question but beeing new to JAVA and FLINK made me struggle a bit. I have also created a KeySelector

     WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                    .keyBy(new MyKeySelector())
                    .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
    

    And my DynamicSessionWindows() is this one:

        public class DynamicSessionWindows implements SessionWindowTimeGapExtractor<Tuple2<Event, Long>> {
        
                @Override
                public long extract(Tuple2<Event, Long> value){
                    return value.f1;
                }
        
        }