Search code examples
apache-flink

How to configure Flink window time based on its key


Differnt types of items arrive into a source which I partition them to different window by its 'type'. Now, each type's window has to be configured with its own emit timeout than having one .byKey("type").window(TumblingProcessingTimeWindows.of(Time.seconds(1))).allowedLateness(Time.seconds(5)).

As an Example: { {"type-1", 5m}, {"type-2", 10m}, {"type-3", 1m} }

So, elements added to a keyed window of type-1 has the emit time of 5 minutes, similarly the type-2 with 10 minutes and so on.

I can do this by maintaining a state with list of times in KeyedProcessFunction, registerProcessingTimeTimer with the timeout configured for the key.

But, instead of having need to manage the state & elements, how do I do this using window? I tried with custom Trigger but couldn't clearly make it work.


Solution

  • I post below the code-fix to my custom Trigger approach. I havent not used the Evictor as suggested by @David, but used the FIRE_AND_PURGE in onProcessingTime. This approach seems to work having tested in few scenarios.

    Slot configuration

    private static final Map<String, Long> eventSlots = new HashMap<>();
    
    static {
        eventSlots.put("type22", 30000l);
        eventSlots.put("type12", 15000l);
        eventSlots.put("type9", 10000l);
        eventSlots.put("type2", 20000l);
        eventSlots.put("type7", 13000l);
    }
    

    processing code

    SingleOutputStreamOperator<Event> sourceStream = ...
    
    someSourceStream
    .keyBy("type")
    .window(GlobalWindows.create())
    .trigger(new Trigger<Event, GlobalWindow>() {
    
     @Override
     public TriggerResult onElement(Event element, long timestamp, GlobalWindow window,
             TriggerContext ctx) throws Exception {
         ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + eventSlots.get(element.getType()));
         return TriggerResult.CONTINUE;
     }
    
     @Override
     public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
             throws Exception {
         return TriggerResult.FIRE_AND_PURGE;
     }
    
     @Override
     public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx)
             throws Exception {
         return TriggerResult.CONTINUE;
     }
    
     @Override
     public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
         ctx.deleteProcessingTimeTimer(window.maxTimestamp());
     }
    
    }).process(new ProcessWindowFunction<Event, Events, Tuple, Context> () {
         @Override
         public void process(Tuple arg0,
             ProcessWindowFunction<Event, Events, Tuple, GlobalWindow>.Context ctx,
             Iterable<Event> arg2, Collector<Event> arg3) throws Exception {
           List<Event> events = new ArrayList<>();
           arg2.forEach(events::add);
           arg3.collect(new Events(events));
    });