Search code examples
javaapache-flink

In Apache Flink, global variables can be established and accessed within the main function


public static long jobStartTimeStamp;

public static void main(String[] args) {

  //Record job start time
  jobStartTimeStamp = System.currentTimeMillis() + 120000;
  System.out.println("StartTime"+jobStartTimeStamp);

...

  if (isOutputTagEnable) {
    SinkFunction<MonitorDataHolder> lateMonitorDataProducer =
      new FlinkKafkaProducer010<MonitorDataHolder>(properties.getProperty("lateMonitorDataSinkTopic", "HaMonitor_MonitorDataDiscard"), new LateMonitorDataSerializationSchema(), kafkaSinkConfig);
    monitorDataCompute
        .getSideOutput(outputTag)
        .filter(new FilterFunction<MonitorDataHolder>() {
          @Override
          public boolean filter(MonitorDataHolder value) throws Exception {
            return value.getCurrentTimeStamp() > jobStartTimeStamp;
          }
        })
        .addSink(lateMonitorDataProducer)
        .setParallelism(Integer.parseInt(properties.getProperty("lateMonitorDataSinkParallelism", "3")))
        .uid("SinkLateMonitorData")
  }
}

Why is jobStartTimeStamp always equal to 0 in the filter operator?

Why is jobStartTimeStamp always equal to 0 in the filter operator?

How should I solve it?


Solution

  • This is because the main method runs in a different JVM than the JVM(s) running the instances of your filter function. The main method runs in the Flink client, which usually executes outside the cluster (or in some cases it runs in the Job Manager process), while the instances of the filter function will execute in the Task Managers.

    One way to accomplish what you're after would be to pass the value of jobStartTimeStamp to the filter function:

            .filter(new FilterFunction<MonitorDataHolder>(jobStartTimeStamp) {
    

    You'll have to adjust the filter function accordingly.