public static long jobStartTimeStamp;
public static void main(String[] args) {
//Record job start time
jobStartTimeStamp = System.currentTimeMillis() + 120000;
System.out.println("StartTime"+jobStartTimeStamp);
...
if (isOutputTagEnable) {
SinkFunction<MonitorDataHolder> lateMonitorDataProducer =
new FlinkKafkaProducer010<MonitorDataHolder>(properties.getProperty("lateMonitorDataSinkTopic", "HaMonitor_MonitorDataDiscard"), new LateMonitorDataSerializationSchema(), kafkaSinkConfig);
monitorDataCompute
.getSideOutput(outputTag)
.filter(new FilterFunction<MonitorDataHolder>() {
@Override
public boolean filter(MonitorDataHolder value) throws Exception {
return value.getCurrentTimeStamp() > jobStartTimeStamp;
}
})
.addSink(lateMonitorDataProducer)
.setParallelism(Integer.parseInt(properties.getProperty("lateMonitorDataSinkParallelism", "3")))
.uid("SinkLateMonitorData")
}
}
Why is jobStartTimeStamp always equal to 0 in the filter operator?
Why is jobStartTimeStamp always equal to 0 in the filter operator?
How should I solve it?
This is because the main method runs in a different JVM than the JVM(s) running the instances of your filter function. The main method runs in the Flink client, which usually executes outside the cluster (or in some cases it runs in the Job Manager process), while the instances of the filter function will execute in the Task Managers.
One way to accomplish what you're after would be to pass the value of jobStartTimeStamp
to the filter function:
.filter(new FilterFunction<MonitorDataHolder>(jobStartTimeStamp) {
You'll have to adjust the filter function accordingly.