Search code examples
javaapache-flinkamazon-kinesisamazon-kinesis-analytics

allowedLateness on Global Window custom trigger


I have created a custom trigger and processing function for my event stream.

DataStream<DynamoDBRow> dynamoDBRows =
    sensorEvents
        .keyBy("id")
        .window(GlobalWindows.create())
        .trigger(new MyCustomTrigger())
        .allowedLateness(Time.minutes(1)) # Note
        .process(new MyCustomWindowProcessFunction());

My trigger is based on an event parameter. Once the event-end signal is received, the MyCustomWindowProcessFunction() is applied on the window elements.

@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      return TriggerResult.FIRE_AND_PURGE;
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

There could be few sensor data, that could come even after the Trigger. So I added .allowedLateness(Time.minutes(1)), to ensure that, those events are not missed out, while processing.

In my case, allowedLateness is not working.

After going through the documents, I found this

allowedLateness is not applicable for Global Window

How can I include allowedLateness in GlobalWindow?

Note: I also tried setting environment Time Characteristic

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

Update: 20-02-2020

Currently thinking of the below approach. (Not working so far)

@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {

  private final long allowedLatenessMillis;

  public JourneyTrigger(Time allowedLateness) {
    this.allowedLatenessMillis = allowedLateness.toMilliseconds();
  }

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
      ctx.registerEventTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    log.info("onEvenTime called at "+System.currentTimeMillis() );
    return TriggerResult.FIRE_AND_PURGE;
  }


  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

Solution

  • Finally, I was able to achieve my requirement using the below custom trigger.

    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    
    @Slf4j
    public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {
    
      private final long allowedLatenessMillis;
    
      public JourneyTrigger(Time allowedLateness) {
        this.allowedLatenessMillis = allowedLateness.toMilliseconds();
      }
    
      @Override
      public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
    
        if (element.isEventEnd()==true) {
          log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
          ctx.registerProcessingTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
        }
    
        return TriggerResult.CONTINUE;
      }
    
      @Override
      public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        log.info("onProcessingTime called at "+System.currentTimeMillis() );
        return TriggerResult.FIRE_AND_PURGE;
      }
    
      @Override
      public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
      }
    
    
    
      @Override
      public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
    }
    

    Also in the Driver.java class, set environment Time Characteristic

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);