Search code examples
scalaapache-flinkflink-streaming

Flink source for periodical update


I'm trying to implement external config for long-running flink job. My idea is to create custom source that periodically (every 5 minutes) polls JSON-encoded config from external service by http.

How to create source that perform action every N minutes? How can I rebroadcast this config to all executors?


Solution

  • first, you need to make an event class which will define all the attributes that your event stream has and then makes all getters, setters and other methods. An example of this class will be

     public class qrsIntervalStreamEvent {
    
        public Integer Sensor_id;
        public long time;
        public Integer qrsInterval;
    
    
        public qrsIntervalStreamEvent(Integer sensor_id, long time, Integer qrsInterval) {
            Sensor_id = sensor_id;
            this.time = time;
            this.qrsInterval = qrsInterval;
        }
    
    
        public Integer getSensor_id() {
            return Sensor_id;
        }
    
        public void setSensor_id(Integer sensor_id) {
            Sensor_id = sensor_id;
        }
    
        public long getTime() {
            return time;
        }
    
        public void setTime(long time) {
            this.time = time;
        }
    
        public Integer getQrsInterval() {
            return qrsInterval;
        }
    
        public void setQrsInterval(Integer qrsInterval) {
            this.qrsInterval = qrsInterval;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (!(o instanceof qrsIntervalStreamEvent)) return false;
    
            qrsIntervalStreamEvent that = (qrsIntervalStreamEvent) o;
    
            if (getTime() != that.getTime()) return false;
            if (getSensor_id() != null ? !getSensor_id().equals(that.getSensor_id()) : that.getSensor_id() != null)
                return false;
            return getQrsInterval() != null ? getQrsInterval().equals(that.getQrsInterval()) : that.getQrsInterval() == null;
        }
    
        @Override
        public int hashCode() {
            int result = getSensor_id() != null ? getSensor_id().hashCode() : 0;
            result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
            result = 31 * result + (getQrsInterval() != null ? getQrsInterval().hashCode() : 0);
            return result;
        }
    
    
        @Override
        public String toString() {
            return "StreamEvent{" +
                    "Sensor_id=" + Sensor_id +
                    ", time=" + time +
                    ", qrsInterval=" + qrsInterval +
                    '}';
        }
    
    
    } //class
    

    Now let's say you want to send these events at x events/ 5 seconds then you can write code something like this

    public class Qrs_interval_Gen extends RichParallelSourceFunction<qrsIntervalStreamEvent> {
    @Override
    public void run(SourceContext<qrsIntervalStreamEvent> sourceContext) throws Exception {
    
    
        int qrsInterval;
        int Sensor_id;
        long currentTime;
        Random random = new Random();
    
        Integer InputRate = 10;
    
        Integer Sleeptime = 1000 * 5 / InputRate  ;
    
    
        for(int i = 0 ; i <= 100000 ; i++){
    
    
            // int randomNum = rand.nextInt((max - min) + 1) + min;
            Sensor_id =  1;
    
            qrsInterval =  10 + random.nextInt((20-10)+ 1);
           // currentTime = System.currentTimeMillis();
            currentTime = i;
    
            //System.out.println("qrsInterval = " + qrsInterval + ", Sensor_id = "+ Sensor_id );
            try {
                Thread.sleep(Sleeptime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
    
            qrsIntervalStreamEvent stream = new qrsIntervalStreamEvent(Sensor_id,currentTime,qrsInterval);
    
                sourceContext.collect(stream);
    
        } // for loop
    
    
    }
    
        @Override
        public void cancel() {
    
        }
    }
    

    Here whole logic is done by

    if you want to send x events/ second then your sleep time will be inverse of that. For example to send 10 events/second

    Sleeptime = 1000 / 10 = 100 milli-seconds

    Similarly, for sending 10 events/ 5 seconds, the sleep time will be

    Sleeptime = 1000 * 5 / 10 = 500 milli-seconds

    Hope it helps, let me know if you have any questions