Search code examples
apache-flinkflink-streaming

Instance of object related to flink Parallelism & Apply Method


  • First let me ask the my question then could you please clarify my assumption about apply method?

  • Question: If my application creates 1.500.000 (approximately) records in every one minute interval and flink job reads these records from kafka consumer with let's say 15++ different operators, then this logic could create latency, backpressure etc..? (you may assume that parallelism is 16)

public class Sample{
  //op1 = 
     kafkaSource
                .keyBy(something)
                .timeWindow(Time.minutes(1))
                .apply(new ApplySomething())
                .name("Name")
                          .addSink(kafkaSink);
  //op2 = 
    kafkaSource
                .keyBy(something2)
                .timeWindow(Time.seconds(1)) // let's assume that this one second
                .apply(new ApplySomething2())
                .name("Name")
                          .addSink(kafkaSink);
 // ...

  //op16 = 
    kafkaSource
                .keyBy(something16)
                .timeWindow(Time.minutes(1)) 
                .apply(new ApplySomething16())
                .name("Name")
                          .addSink(kafkaSink);

}
// ..
public class ApplySomething ... {
  private AnyObject object;
  private int threshold = 30, 40, 100 ...;

      @Override
    public void open(Configuration parameters) throws Exception{
        object = new AnyObject();
    }

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Record> input, Collector<Result> out) throws Exception{
        int counter = 0;
        for (Record each : input){
          counter += each.getValue();
          if (counter > threshold){
            out.collec(each.getResult());
            return;
          }
        }
    }
}
  • If yes, should i use flatMap with state(rocksDB) instead of timeWindow?
  • My prediction is "YES". Let me explain why i am thinking like that:
    • If parallelism is 16 than there will be a 16 different instances of indivudual ApplySomething1(), ApplySomething2()...ApplySomething16() and also there will be sixteen AnyObject() instances for per ApplySomething..() classes.
    • When application works, if keyBy(something)partition number is larger than 16 (assumed that my application has 1.000.000 different something per day), then some of the ApplySomething..()instances will handle the different keys therefore one apply() should wait the others for loops before processing. Then this will create a latency?

Solution

  • Flink's time windows are aligned to the epoch (e.g., if you have a bunch of hourly windows, they will all trigger on the hour). So if you do intend to have a bunch of different windows in your job like this, you should configure them to have distinct offsets, so they aren't all being triggered simultaneously. Doing that will spread out the load. That will look something like this

    .window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(15))
    

    (or use TumblingEventTimeWindows as the case may be). This will create minute-long windows that trigger at 15 seconds after each minute.

    Whenever your use case permits, you should use incremental aggregation (via reduce or aggregate), rather than using a WindowFunction (or ProcessWindowFunction) that has to collect all of the events assigned to each window in a list before processing them as a sort of mini-batch.

    A keyed time window will keep its state in RocksDB, assuming you have configured RocksDB as your state backend. You don't need to switch to using a RichFlatMap to have access to RocksDB. (Moreover, since a flatMap can't use timers, I assume you would really end up using a process function instead.)

    While any of the parallel instances of the window operator is busy executing its window function (one of the ApplySomethings) you are correct in thinking that that task will not be doing anything else -- and thus it will (unless it completes very quickly) create temporary backpressure. You will want to increase the parallelism as needed so that the job can satisfy your requirements for throughput and latency.