Search code examples
apache-flinkflink-streaming

Scheduled Task with Apache Flink


I have a flink job with parallelism 5 (for now !!). And one of the richFlatMap stream opens one file in the open(Configuration parameters) method. In the flatMapoperation there is no any open action, it just read the file to search something. (There is a utility class which has a method like utilityClass.searchText('abc')). Here is the boilerplate code:

public class MyFlatMap extends RichFlatMapFunction<...> {

    private MyUtilityFile myFile;

    @Override
    public void open(Configuration parameters) throws Exception {
        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

}

This file is being updated by the python script every day at specific time. Therefore I should also open the newly created file (by python script) in the flatMap stream.

I just though that this can be done by ScheduledExecutorService with only one thread pool.

I can not open this file every flatMap calls because it is big.

Here is the boilerplate code I am trying to write:

public class MyFlatMap extends RichFlatMapFunction<...> implements Runnable {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private MyUtilityFile myFile;

    @Override
    public void run() {
       myFile.Open("fileLocation");     
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);

        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

}

Is this boilerplate okey for Flink environment? If not, how can i open the file with a scheduled manner? (There is no option such as "after updating file send event with kafka and read event by flink")


Solution

  • Perhaps you can directly implement the ProcessingTimeCallback interface, which supports timer operations

    public class MyFlatMap extends RichFlatMapFunction<...> implements ProcessingTimeCallback { 
        private MyUtilityFile myFile;
    
     
        @Override
        public void open(Configuration parameters) throws Exception {
            scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
    
            final long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + 3600000, this);
    
            myFile.Open("fileLocation");
        }
    
        @Override
        public void flatMap(...) throws Exception {
            String text = myFile.searchText('abc');
            if (text != null) // take an action
            else // another action
        }
    
        @Override
        public void onProcessingTime(long timestamp) throws Exception {
            myFile.Open("fileLocation");
    
            final long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + 3600000, this);
        }
    }