Search code examples
javastreamapache-flinkflink-streaming

Late outputs missing for Flink's Session Window


In my pipeline's setup I cannot see side outputs for Session Window. I'm using Flink 1.9.1

Version 1. What I have is this:

messageStream.
    .keyBy(tradeKeySelector)
    .window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
    .sideOutputLateData(lateTradeMessages)
    .process(new CumulativeTransactionOperator())
    .name("Aggregate Transaction Builder");

lateTradeMessages implementes SessionWindowTimeGapExtractor and returns 5 secodns.

Further I have this:

messageStream.getSideOutput(lateTradeMessages)
  .keyBy(tradeKeySelector)
  .process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
     @Override
     public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
                   System.out.println("Process Late messages For Aggregation");
                   out.collect(new Transaction());
              }
       })
   .name("Process Late messages For Aggregation");

The problem is that I never see "Process Late messages For Aggregation" when I'm sending messages with same key that should miss window time.

When Session Window passes and I "immediately" sent a new message for the same key it triggers new Session Window without going into Late SideOutput.

Not sure What I'm doing wrong here.

What I would like to achieve here, is to catch "late events" and try to reprocess them.

I will appreciate any help.


Version 2, after @Dominik Wosiński comment:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);


DataStream<RawMessage> rawBusinessTransaction = env
                .addSource(new FlinkKafkaConsumer<>("business",
                        new JSONKeyValueDeserializationSchema(false), properties))
                .map(new KafkaTransactionObjectMapOperator())
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(System.currentTimeMillis());
                    }

                    @Override
                    public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
                        return element.messageCreationTime;
                    }
                })
                .name("Kafka Transaction Raw Data Source.");

messageStream
             .keyBy(tradeKeySelector)
             .window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
             .sideOutputLateData(lateTradeMessages)
             .process(new CumulativeTransactionOperator())
             .name("Aggregate Transaction Builder");

Watermarks are progressing, I've checked in Flink's Metrics. The Window operator is execution, but still there are no Late Outputs.

BTW, Kafka topic can be idle, so I have to emit new WaterMarks periodically.



Solution

  • The watermark approach looks very suspicious to me. Usually, you would output the latest event timestamp at this point.

    Just some background information, so that it's easier to understand.

    Late events refer to events that come after the watermark processed to a time after the event. Consider the following example:

    event1 @time 1
    event2 @time 2
    watermark1 @time 3
    event3 @time 1 <-- late event
    event4 @time 4
    

    Your watermark approach would pretty much render all past events as late events (a bit of tolerance because of the 1s watermark interval). This would also make reprocessing and catchups impossible.

    However, you are actually not seeing any late events which is even more surprising to me. Can you double-check your watermark approach, describe your use case, and provide example data? Often times, the implementation is not ideal for the actual use case and it should be solved in a different way.