Search code examples
apache-flinkflink-streaming

Flink windowing: aggregate and output to sink


We have a stream of data where each element is of this type:

id: String
type: Type
amount: Integer

We want to aggregate this stream and output the sum of amount once per week.

Current solution:

A example flink pipeline would look like this:

stream.keyBy(type)
      .window(TumblingProcessingTimeWindows.of(Time.days(7)))
      .reduce(sumAmount())
      .addSink(someOutput())

For input

| id | type | amount |
| 1  | CAT  | 10     |
| 2  | DOG  | 20     |
| 3  | CAT  | 5      |
| 4  | DOG  | 15     |
| 5  | DOG  | 50     |

if the window ends between record 3 and 4 our output would be:

| TYPE | sumAmount |
| CAT  | 15        | (id 1 and id 3 added together)
| DOG  | 20        | (only id 2 as been 'summed')

Id 4 and 5 would still be inside the flink pipeline and will be outputted next week.

Thus next week our total output would be:

| TYPE | sumAmount |
| CAT  | 15        | (of last week)
| DOG  | 20        | (of last week)
| DOG  | 65        | (id 4 and id 5 added together)

New requirement:

We now also want to know for each record in what week has each record been processed. In other words our new output should be:

| TYPE | sumAmount | weekNumber |
| CAT  | 15        | 1          |
| DOG  | 20        | 1          |
| DOG  | 65        | 2          |

but we also want an additional output like this:

| id | weekNumber |
| 1  | 1          |
| 2  | 1          |
| 3  | 1          |
| 4  | 2          |
| 5  | 2          |

How to handle this?

Does flink have any way to achieve this? I would image we would have an aggregate function that sums the amounts but also outputs each record with the current week number for example but I don't find a way to do this in the docs.

(Note: we process about a 100 million records a week, so ideally we would only like to keep the aggregates in flink's state during the week, not all individual records)

EDIT:

I went for the solution described by Anton below:

DataStream<Element> elements = 
  stream.keyBy(type)
        .process(myKeyedProcessFunction());

elements.addSink(outputElements());
elements.getSideOutput(outputTag)
        .addSink(outputAggregates())

And the KeyedProcessFunction looks something like:

class MyKeyedProcessFunction extends KeyedProcessFunction<Type, Element, Element>
    private ValueState<ZonedDateTime> state;
    private ValueState<Integer> sum;

    public void processElement(Element e, Context c, Collector<Element> out) {
        if (state.value() == null) {
            state.update(ZonedDateTime.now());
            sum.update(0);
            c.timerService().registerProcessingTimeTimer(nowPlus7Days);
        }
        element.addAggregationId(state.value());
        sum.update(sum.value() + element.getAmount());
    }

    public void onTimer(long timestamp, OnTimerContext c, Collector<Element> out) {
        state.update(null);
        c.output(outputTag, sum.value()); 
    }
} 

Solution

  • There's a variant of the reduce method that takes a ProcessWindowFunction as a second argument. You would use it like this:

    stream.keyBy(type)
      .window(TumblingProcessingTimeWindows.of(Time.days(7)))
      .reduce(sumAmount(), new WrapWithWeek())
      .addSink(someOutput())
    
    private static class WrapWithWeek
      extends ProcessWindowFunction<Event, Tuple3<Type, Long, Long>, Type, TimeWindow> {
    
          public void process(Type key,
                    Context context,
                    Iterable<Event> reducedEvents,
                    Collector<Tuple3<Type, Long, Long>> out) {
              Long sum = reducedEvents.iterator().next();
              out.collect(new Tuple3<Type, Long, Long>(key, context.window.getStart(), sum));
          }
    }
    

    Normally a ProcessWindowFunction is passed an Iterable holding all of the events collected by the window, but if you are using a reduce or aggregate function to pre-aggregate the window result, then only that single value is passed into the Iterable. The documentation for this is here but the example in the docs currently has a small bug which I've fixed in my example here.

    But given the new requirement for the second output, I suggest you abandon the idea of doing this with Windows, and instead use a keyed ProcessFunction. You'll need two pieces of per-key ValueState: one that's counting up by weeks, and another to store the sum. You'll need a timer that fires once a week: when it fires, it should emit the type, sum, and week number, and then increment the week number. Meanwhile the process element method will simply output the ID of each incoming event along with the value of the week counter.