Search code examples
apache-kafkaapache-flinkflink-streaming

flink's Kafka Watermark Strategies don't work in my application


I use flink version 1.13.0

When i try to use Kafka Watermark Strategies by the flink doc which seems dont work, the window-process function will not be run.

And i want to know in this way, the watermark's timestamp will use the consume' time or produce's time in kafka?

my consumer's code like this:

val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
  .setCommitOffsetsOnCheckpoints(true)
  .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
stream = env.addSource(source)
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)

and use window like this:

processStream
  .keyBy(_.num)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .aggregate(new LatTimeAggregate(), new SignLatCalculateProcess())
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)
  .addSink(new SignLatSink(serverConfig.smsRuleRedis))
  .name("lat_count_sink")
  .uid("lat_count_sink")

and the topological graph like this:

topological graph


Solution

  • Since you haven't specified a timestamp assigner in your watermark strategy, you are relying on the FlinkKafkaConsumer to have assigned timestamps to the stream records. This will only work if the records being read from Kafka have timestamps in their headers. Otherwise, you'll need to implement a timestamp assigner to extract the timestamps from the events.

    Note that you won't be able to implement a timestamp assigner that the FlinkKafkaConsumer can apply unless you also implement a deserializer that the FlinkKafkaConsumer can use to produce objects with timestamps that can then be extracted. Otherwise, you can opt to apply the watermark strategy somewhere after the source.

    If lack of timestamps isn't the problem, there are other things that could be the issue. For example, you might have an idle Kafka partition, or lack events far enough into the future to close the window.

    By the way, if your events are in order on a per-partition basis, and if you call assignTimestampsAndWatermarks on the FlinkKafkaConsumer (which you are currently doing), then you can use forMonotonousTimestamps rather than forBoundedOutOfOrderness, which has some significant advantages.