Search code examples
javaapache-flinkwatermark

Flink window function and watermarks


I'm new in Flink, and I've started a project where I've to create windowed functions; my main code is like this:

public class Main {
public static void main(String[] args) throws Exception {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9093");
    properties.setProperty("group.id", "test");

    Configuration conf = new Configuration();
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    conf.setInteger(RestOptions.PORT, 8082);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

    //configuration hashmap
    HashMap<String,Integer> config = new HashMap<>();
    /**added data to config hashmap**/
    config.put("620467-DTC",514);
    config.put("383069-DCDC_1",64);
    System.out.println(config.toString());

    SingleOutputStreamOperator<MetricObject> stream = env
            .addSource(new FlinkKafkaConsumer<>("input_topic", new JsonDeserializationSchema(), properties))
            .flatMap(new MetricFilter(config));


    SingleOutputStreamOperator<Tuple2<String, Long>> windowedStream = stream
            .assignTimestampsAndWatermarks(new RecordWatermark().withTimestampAssigner(new ExtractRecordTimestamp()))
            .keyBy(new MetricGrouper())
            .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .process(new WindowedCount());

    windowedStream.print();

the RecordWatermark class is implemented in the following way:

public class RecordWatermark implements WatermarkStrategy<MetricObject> {

@Override
public TimestampAssigner<MetricObject> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
    return WatermarkStrategy.super.createTimestampAssigner(context);
}

@Override
public WatermarkStrategy<MetricObject> withTimestampAssigner(SerializableTimestampAssigner<MetricObject> timestampAssigner) {
    return WatermarkStrategy.super.withTimestampAssigner(timestampAssigner);
}

@Override
public WatermarkGenerator<MetricObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
    return new MetricWatermarksGenerator();
}

and the extracRecordTimestamp class has a the method extractTimestamp that permit to extract the timestamp value directly from the record:

public class ExtractRecordTimestamp implements SerializableTimestampAssigner<MetricObject> {

@Override
public long extractTimestamp(MetricObject element, long recordTimestamp) {
    Instant timestamp = element.getTimestamp();

    log.info("Event time: {}", DateTimeFormatter.ISO_INSTANT.format(timestamp));

    return timestamp.toEpochMilli();
}

about watermarks creation, I've implemented a class that implement the interface WatermarkGenerator:


    private final long maxOutOfOrderness = 5000; // 5 seconds
    private long currentMaxTimestamp;
    private long currentTime;

    @Override
    public void onEvent(MetricObject event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
        currentTime = System.nanoTime();

        final String eventTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(eventTimestamp));
        final String currentMaxTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(currentMaxTimestamp));

        log.info("Event timestamp: {}, maxTimestamp: {}", eventTsString, currentMaxTsString);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

I produce message to the kafka topic with a simple java producer, using Docker so that I can run a Kafka instance. the problem about this code is that the windowing doesn't work and no records are processed with the count function. i tried to put some logs in the classes involved and here's an example of the output:

19:29:27.425 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:27.872 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:52Z'
19:29:27.882 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:25:52Z
19:29:27.902 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:52Z, maxTimestamp: 2021-04-21T16:25:52Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:55Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:55Z'
19:29:29.304 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:58Z
19:29:29.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:58Z'
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:25:58Z
19:29:29.312 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:58Z, maxTimestamp: 2021-04-21T16:25:58Z
19:29:30.306 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:30.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:30.310 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:26:00Z
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:00Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:30.545 [Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, WindowedCount) -> Sink: Print to Std. Out (2/4)#0] INFO  WindowedCount - watermark:2021-04-21T16:25:54.999Z
2> (620467,1)
19:29:31.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:26Z
19:29:31.319 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:31.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:26Z'
19:29:32.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:50Z
19:29:32.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:55.228960Z
19:29:32.325 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:50Z'
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:25:50Z
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:50Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:33.323 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:33.327 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:46.161843Z
19:29:33.331 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:34.329 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:27:57Z
19:29:34.332 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:41.633868Z
19:29:34.336 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:27:57Z'
19:29:35.337 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:37.399379Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:26:01Z'
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:01Z, maxTimestamp: 2021-04-21T16:26:01Z

it seems like only the first record is processed and the next records are not considered. the input that I'm using are these records:

{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:52Z":{"16":{"DTC":{....
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:55Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:58Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:26:00Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:26Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:25:50Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:26:00Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":620685,"samples":{"2021-04-21T16:27:57Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:26:01Z":{"6":{"DCDC_1":...

where the timestamp is the effective event time that should be used. These records are filtered so that only the record with rootAssetId 620467 and 383069 (the logic isn't important).


Solution

  • When Flink's event time windowing fails to produce results, it is either because

    1. the window is empty, or
    2. a sufficiently large watermark isn't being generated.

    In this case, reason #2 applies.

    The data you have supplied is being assigned to two overlapping windows, one for the interval 16:20:00 to 16:29:59.999, and the other for the interval 16:25:00 to 16:34:59.999. For the window that ends at 16:29:59.999 to be triggered, a watermark of at least 16:29:59.999 must come along, indicating the stream is now complete up through that timestamp.

    For such a watermark to be generated by your watermark strategy, an event with a timestamp of 16:30:05 (or greater) has to appear in the input. That doesn't seem to be happening.

    The other way to get the windows to close is for the input stream to be bounded. Whenever a bounded source reaches its end, then one last watermark of MAX_WATERMARK is generated by the source (before the job is shutdown) -- and this will close all remaining event time windows. You can get this to happen in your testing by either using the new KafkaSource introduced in Flink 1.14, along with its setBounded option, or by implementing a deserializer that at some point returns true from its isEndOfStream method.


    You can simplify your watermarking code. I believe what you have written is equivalent to this, which is easier to understand and maintain:

    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<MetricObject>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, timestamp) ->
                event.getTimestamp().toEpochMilli()));