I'm new working with Flink, so there are some problems I'm facing when defining watermarks in Flink.
Let's start with the Kafka Consumer. The deserialization used is JSONKeyValueDeserializationSchema, so there is no customized parse.
val kafkaConsumer: FlinkKafkaConsumer[ObjectNode] = new FlinkKafkaConsumer[ObjectNode](
kafkaTopic,
new JSONKeyValueDeserializationSchema(false),
properties
)
If a sink is applied to this code, it works just fine. The problem is that a watermark is needed to avoid out of order events. That's the strategy I wrote:
val watermarkStrategy: WatermarkStrategy[ObjectNode] = WatermarkStrategy
.forBoundedOutOfOrderness[ObjectNode](Duration.ofSeconds(100))
.withTimestampAssigner(
new SerializableTimestampAssigner[ObjectNode] {
override def extractTimestamp(record: ObjectNode, recordTimestamp: Long): Long = {
Instant.parse(record.get("value").get("content").get("timestamp").asText()).getEpochSecond
}
})
I ended up with this code after doing some research, but this is not working. These are my questions:
The problem was my parse.
dateFormat.parse(record.get("value").get("content").get("timestamp").asText()).getTime