Search code examples
javacsvapache-flinkflink-streamingflink-batch

watermarks not increasing in flink


so I'm trying to create my own windowing scheme thought the use of unkeyed processFunctions. I'm using a source and would like to use watermarks. My current implementation of watermarks is as follows

this.watermarkStrategy = WatermarkStrategy
                .<EventBasic>forMonotonousTimestamps()
                .withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);

I've created my source as follows

DataStream<EventBasic> mainStream = env.readTextFile(csvFilePath)
            .map(new MapFunction<String, EventBasic>() {
                @Override
                public EventBasic map(String line) throws Exception {
                    String[] parts = line.split(",");
                    if (parts.length == 3) {
                        String key = parts[0];
                        int valueInt = Integer.parseInt(parts[1]);
                        long valueTimeStamp = Long.parseLong(parts[2]);
                        return new EventBasic(key, valueInt, valueTimeStamp);
                    } else {
                        return null;
                    }
                }
            }).setParallelism(3).assignTimestampsAndWatermarks(watermarkStrategy).name("source");

this source function reads a CSV file that has the following format:

key,val,timestamp
A,0,500
C,1,500
A,2,500
A,3,500
A,4,500
B,5,500
A,6,500
H,7,500
...
a,100,1500

With timestamps increasing monotonously

when observing immediately (i created a dummy processfunction to observe that my timestamps were working) I observe the value -9223372036854775808 which constantly. This means that the watermark generation doesn't know when to add a new watermark.

I've also tried the following watermark strategy which lead to the same output:

this.watermarkStrategy = WatermarkStrategy
                .<EventBasic>forBoundedOutOfOrderness(Duration.ofMillis(500))
                .withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);

I don't know what my issue could be and I've tried looking everywhere but nothing seems to change.


Solution

  • So it turns out the issue why the watermarks were not increasing, but the timestamps were working correctly is due to the fact that I had set the environment to BATCH mode (env.setRuntimeMode(RuntimeExecutionMode.STREAMING)) therefore with flink they believe that since the data is known in advance there is no need for watermarks.

    src:https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution_mode/

    I hope that this will help anyone who ever falls on this issue.