Search code examples
duplicatesapache-flink

Flink keyedstream generate duplicate results with same key and window timestamp


Here is my Flink job workflow:

DataStream<FlinkEvent> events = env.addSource( consumer ).flatMap(...).assignTimestampsAndWatermarks( new EventTsExtractor() );
DataStream<SessionStatEvent> sessionEvents = events.keyBy( 
    new KeySelector<FlinkEvent, Tuple2<String, String> >()
            {
                @Override
                public Tuple2<String, String> getKey( FlinkEvent value ) throws Exception {
                    return(Tuple2.of( value.getF0(), value.getSessionID ) );
                }
            } )
      .window( TumblingEventTimeWindows.of( Time.minutes( 2 ) ) )
      .allowedLateness( Time.seconds( 10 ) )
      .aggregate( new SessionStatAggregator(), new SessionStatProcessor() );
/* ... */
sessionEvents.addSink( esSinkBuilder.build() );

First I encountered

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

in flatMap operator and the task keep restarting. I observed many duplicate results with different value by same key and window timestamp.

Q1: I guess the duplicates was becaused the downstream operators consume message duplicately after job restarted. Am I right? I resubmitted the job after fixed the ExceptionInChainedOperatorException problem. I observed duplicates in the first time window again. And after that, the job seems to worked out right (one result in one time window per key).

Q2: Where did the duplicates come from?


Solution

  • ... there should be one result per key for one window

    This is not (entirely) correct. Because of the allowedLateness, any late events (within the period of allowed lateness) will cause late (or in other words, extra) firings of the relevant windows. With the default EventTimeTrigger (which you appear to be using), each late event causes an additional window firing, and an updated window result will be emitted.