Search code examples
apache-flinkflink-streaming

Alternatives to GlobalWindow for rolling aggregation


I'm wondering if Flink is suitable for the following use case. Let's say I have a stream of measurements (device_id, value), e.g.

(1, 10.2), (2, 3.4), (3, 9.1), (1, 7.0), (3, 6.3), (5, 17.8)

And I want to report every minute the latest value for any device_id has been seen so far.

Given the data:

data:  (1, 10.2), (2, 3.4), (3, 9.1), (1, 7.0), (3, 6.3), (5, 17.8)

time: 0 ----------------- 1min -------------- 2min ------------------ 3min

I'd like to have a result:

1: { (1, 10.2), (2, 3.4) }

2: { (1, 7.0), (2, 3.4), (3, 9.1) }

3: { (1, 7.0), (2, 3.4), (3, 6.3), (5, 17.8) }

I came up with the implementation that includes

.windowAll(GlobalWindows.create()).trigger(CountTrigger.of(1)).apply( ... ) 

but it doesn't look good (memory wise) on a large dataset. Is there another way to do this?


Solution

  • You might want to consider something more like this as the starting point:

    public class StreamingJob {
      private static final TimeUnit windowTimeUnit = TimeUnit.SECONDS;
      private static final long windowLength = 10;
    
      private static long getNearestRightBoundaryFor(Long timestamp, Long duration, TimeUnit unit){
        Long durationEpoch = unit.toMillis(duration);
        Long quotient = timestamp / durationEpoch;
        return (quotient + 1) * durationEpoch - 1;
      }
    
      public static void main(String[] args) throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.fromElements(
                Tuple3.of(1000L, 1L, 3.8f), Tuple3.of(2003L, 2L, 82.3f), Tuple3.of(3006L, 1L, 4.2f), // 0 - 09
                Tuple3.of(11120L, 2L, 10f), Tuple3.of(12140L, 2L, 7.15f), Tuple3.of(13150L, 3L, 3.33f), // 10 - 19
                Tuple3.of(21200L, 2L, 1.09f), Tuple3.of(22270L, 1L, 2.22f), Tuple3.of(23280L, 2L, 3.8f), // 20 - 29
                Tuple3.of(31310L, 3L, 3.12f), Tuple3.of(32330L, 2L, 9.2f), Tuple3.of(33390L, 1L, 4.0f) // 30 - 39
        )
        .assignTimestampsAndWatermarks(
                new AssignerWithPunctuatedWatermarks<Tuple3<Long,Long,Float>>() {
                    @Nullable
                    @Override
                    public Watermark checkAndGetNextWatermark(Tuple3<Long, Long, Float> lastElement, long extractedTimestamp) {
                        return new Watermark(extractedTimestamp);
                    }
    
                    @Override
                    public long extractTimestamp(Tuple3<Long, Long, Float> element, long previousElementTimestamp) {
                        return element.f0;
                    }
                })
        .keyBy(new KeySelector<Tuple3<Long,Long,Float>, Long>() {
            @Override
            public Long getKey(Tuple3<Long, Long, Float> value) throws Exception {
                return value.f1;
            }
        })
        .process(new KeyedProcessFunction<Long, Tuple3<Long,Long,Float>, Tuple4<Long, Long, Long, Float>>() {
            private ValueState<Tuple3<Long, Long, Float>> state;
    
            @Override
            public void open(Configuration parameters) {
                ValueStateDescriptor<Tuple3<Long, Long, Float>> descriptor = new ValueStateDescriptor<>(
                        "state",
                        TypeInformation.of(new TypeHint<Tuple3<Long, Long, Float>>() {
                        }));
    
                state = getRuntimeContext().getState(descriptor);
            }
    
            @Override
            public void processElement(Tuple3<Long, Long, Float> value, Context ctx, Collector<Tuple4<Long, Long, Long, Float>> out) throws Exception {
                Tuple3<Long, Long, Float> currentValue = state.value();
                if (currentValue == null) {
                    Long ts = getNearestRightBoundaryFor(value.f0, windowLength, windowTimeUnit);
                    ctx.timerService().registerEventTimeTimer(ts);
                    state.update(value);
                }
                else if (value.f0 > currentValue.f0) { // ignore out-of-order events
                    state.update(value);
                }
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple4<Long, Long, Long, Float>> out) throws IOException {
                Tuple3<Long, Long, Float> currentValue = state.value();
                out.collect(new Tuple4(timestamp, currentValue.f0, currentValue.f1, currentValue.f2));
                Long newTs = timestamp + windowTimeUnit.toMillis(windowLength);
                if (ctx.timerService().currentWatermark() < Long.MAX_VALUE) {
                    ctx.timerService().registerEventTimeTimer(newTs);
                }
            }
        })
        .print();
        env.execute("Flink FTW!");
      }
    }
    

    Some things to point out:

    I wouldn't recommend using Windows for this. With GlobalWindows it gets complicated to manage expiring state.

    I've used an AssignerWithPunctuatedWatermarks rather than an AscendingTimestampExtractor. I've done this for three reasons: (1) once you switch to running in parallel it may be difficult to ensure that the events arrive in order; (2) AscendingTimestampExtractors generate watermarks periodically (by default, every 200msec of real time) and for the purposes of this example the app has consumed all of its input before the first watermark is generated; (3) a simple check in the processElement method is all that's needed to cope with out-of-order events. But you're probably better off with an AscendingTimestampExtractor in production, if the events are truly in order, or a BoundedOutOfOrdernessTimestampExtractor.

    The output looks like this:

    (9999,11120,2,10.0)
    (19999,21200,2,1.09)
    (19999,13150,3,3.33)
    (29999,23280,2,3.8)
    (29999,31310,3,3.12)
    (39999,32330,2,9.2)
    (39999,31310,3,3.12)
    (9999,3006,1,4.2)
    (19999,3006,1,4.2)
    (29999,22270,1,2.22)
    (39999,33390,1,4.0)
    

    The reason why (11120,2,10.0) is triggered at 9999 is because it is the arrival of this event with timestamp 11120 that advances the watermark past 9999, causing that timer to fire. By the time the onTimer is called, the onElement has already been called.

    The check in onTimer for ctx.timerService().currentWatermark() < Long.MAX_VALUE is so that this finite example won't run forever. If a streaming job reaches the end of its input, a final watermark with timestamp of Long.MAX_VALUE is injected to cause one last firing of any remaining timers. In that case we shouldn't create another timer.