Search code examples
scalaapache-flinkamazon-kinesis-analytics

How to update/refresh a parameter in Flink application


I have a Flink application on AWS Kinesis Analytics service. I need to filter some values on a data stream based on a threshold. Also, I'm passing the threshold parameter using AWS Systems Manager Parameter Store service. For now, I got this:

  • In my Main class:
val threshold: Int = ssmParameter.getParameterRequest(ssmClient, "/kinesis/threshold").toInt

val kinesis_deserialization_schema = new KinesisDeserialization[ID]
            val KinesisConsumer = new FlinkKinesisConsumer[ID](
                "Data-Stream",
                kinesis_deserialization_schema,
                consumerProps
            )
            val KinesisSource = env.addSource(KinesisConsumer).name(s"Kinesis Data")
val valid_data = KinesisSource
          .filter(new MyFilter[ID](threshold))
          .name("FilterData")
          .uid("FilterData")
  • Filter class:
import cl.mydata.InputData
import org.apache.flink.api.common.functions.FilterFunction

class MyFilter[ID <: InputData](
                                  threshold: Int
                                ) extends FilterFunction[ID] {
  override def filter(value: ID): Boolean = {
      value.myvalue > threshold
    }
  }
}

This works fine, the thing is that I need to update the threshold parameter every hour, because that value can be changed by my client.


Solution

  • Perhaps you can implement the ProcessingTimeCallback interface in the MyFilter class, which supports timer operations, and you can update the threshold in the onProcessingTime function

    public class MyFilter extends FilterFunction<...> implements ProcessingTimeCallback { 
        int threshold;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
    
            final long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + 3600000, this);
        }
    
        @Override
        public boolean filter(IN xxx) throws Exception {
            return xxx > threshold;
        }
    
        @Override
        public void onProcessingTime(long timestamp) throws Exception {
            threshold = XXXX;
    
            final long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + 3600000, this);
        }
    }