Search code examples
apache-flinkflink-streaming

How to implement a WindowableTask similar to samza in apache flink?


Samza has a concept of windowing where a stream processing job needs to do something in regular intervals, regardless of how many incoming messages the job is processing.

For example, a simple per-minute event counter in samza will be like below:

public class EventCounterTask implements StreamTask, WindowableTask {

  public static final SystemStream OUTPUT_STREAM =
    new SystemStream("kafka", "events-per-minute");

  private int eventsSeen = 0;

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    eventsSeen++;
  }

  public void window(MessageCollector collector,
                     TaskCoordinator coordinator) {
    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen));
    eventsSeen = 0;
  }
}

Can someone let me know how to implement an equivalent thing in apache flink (samza is single threaded so window and process will not happen concurrently) or point me to the relevant documentation?


Solution

  • There are at least four different ways to interpret "per-minute". Along one binary dimension there's the distinction between using event time and processing time (one minute as measured by timestamps in the events, or one minute as measured by the CPU wall clock). And the other binary dimension has to do with whether the minutes are aligned to UTC, or to the first event.

    The relevant lower-level mechanisms available to you in Flink are event time and processing time windows, and timers, which are part of process functions. For self-paced tutorials, examples, and exercises with solutions, see Learn Flink: Hands-on Training.

    But with Flink, windowing is more readily done with SQL or the Table API. For example, a simple per-processing-time-minute event counter will be like this:

    SELECT COUNT(*)
    FROM Events
    GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE)
    

    For more, see the docs on windowing with SQL and the docs on windowing with the Table API. For tutorials on Flink SQL, see https://github.com/ververica/sql-training.