Search code examples
apache-flinkflink-streaming

Watermark strategy not working for Kafka Consumer in Flink


I'm new working with Flink, so there are some problems I'm facing when defining watermarks in Flink.

Let's start with the Kafka Consumer. The deserialization used is JSONKeyValueDeserializationSchema, so there is no customized parse.

val kafkaConsumer: FlinkKafkaConsumer[ObjectNode] = new FlinkKafkaConsumer[ObjectNode](
  kafkaTopic,
  new JSONKeyValueDeserializationSchema(false),
  properties
)

If a sink is applied to this code, it works just fine. The problem is that a watermark is needed to avoid out of order events. That's the strategy I wrote:

 val watermarkStrategy: WatermarkStrategy[ObjectNode] = WatermarkStrategy
  .forBoundedOutOfOrderness[ObjectNode](Duration.ofSeconds(100))
  .withTimestampAssigner(
    new SerializableTimestampAssigner[ObjectNode] {
      override def extractTimestamp(record: ObjectNode, recordTimestamp: Long): Long = {
        Instant.parse(record.get("value").get("content").get("timestamp").asText()).getEpochSecond
      }
    })

I ended up with this code after doing some research, but this is not working. These are my questions:

  • Using ObjectNode the best option here? Is there any other option?
  • The field timestamp is a string in the ISO 8601 standard, so I have to parse it to a long. Is this the best way to do that? Is there a better way?
  • Are SerializableTimestampAssigner and forBoundedOutOfOrderness the right things to use?

Solution

  • The problem was my parse.

    dateFormat.parse(record.get("value").get("content").get("timestamp").asText()).getTime