Search code examples
apache-kafkaapache-flinkflink-streaming

Scala: Cannot resolve overloaded methods (Flink WatermarkStrategy)


I'm following Flink's documentation on how to use WatermarkStrategy with KafkaConsumer. The code is shown below

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

val stream: DataStream[MyType] = env.addSource(kafkaSource)

Anytime I try to compile the code above I get an error saying

error: overloaded method value assignTimestampsAndWatermarks with alternatives:

error: overloaded method value assignTimestampsAndWatermarks with alternatives:
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.api.common.eventtime.WatermarkStrategy[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String]
[ERROR]  cannot be applied to (org.apache.flink.api.common.eventtime.WatermarkStrategy[Nothing])
[ERROR]         consumer.assignTimestampsAndWatermarks(

Solution

  • The code below returns WatermarkStrategyy[Nothing] instead of WatermarkStrategy[String]

      WatermarkStrategy
        .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
    

    I solved this by using this code

    val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
    watermark: Watermark[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
    kafkaSource.assignTimestampsAndWatermarks(watermark)