Search code examples
javastreamapache-flinkflink-streaming

Apache Flink, number of Task Slot vs env.setParallelism


Could you explain differences between task slot and parallelism in Apache Flink v1.9?

  • Here is the my understanding so far

    • Flink says that TaskManager is the worker PROCESS. And normally you should have one TaskManager per one computer.
    • Let's say I have 3 computers and both of them have 16 CPU cores. Each computer will be TaskManager. Therefore I will have 3 TaskManagers
    • I have thought that if one computer has 16 cpu cores, then TaskManager can create max 16 Task slots. Therefore there is a CPU isolation in there. However Flink says that link => "Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks."
    • That's means 16 slots = 16 threads ? And also numberOfSlot can be >= numberOfCpuCores?
  • If task slots mean thread, this may lead "shared to access data problem, race condition" etc..? This is the my first question.

  • Second question is the one I wrote to beginning of the my post => differences between task slot and parallellism. I am talking about to env.setparalellism(number).
    • Let's say my parallelism number = 2
    • Then for each task slot (thread or whatever it is) will be executed with 2 threads?
      • if it is, this may lead "shared to access data problem, race condition" etc..?
      • if it is not, what does the parallelism means?
  • Here is the example. In this example, should I care about writing apply()method because of threads environment?:

public class AverageSensorReadings {
 public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  int paralellism = env.getParallelism();
  int maxParal = env.getMaxParallelism();

  // ingest sensor stream
  DataStream < SensorReading > sensorData = env
   // SensorSource generates random temperature readings
   .addSource(new SensorSource())
   // assign timestamps and watermarks which are required for event time
   .assignTimestampsAndWatermarks(new SensorTimeAssigner());

  DataStream < SensorReading > avgTemp = sensorData
   // convert Fahrenheit to Celsius using and inlined map function
   .map(r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
   // organize stream by sensor
   .keyBy(r -> r.id)
   // group readings in 1 second windows
   .timeWindow(Time.seconds(4))
   // compute average temperature using a user-defined function
   .apply(new TemperatureAverager());

  // print result stream to standard out
  //avgTemp.print();
  System.out.println("paral: " + paralellism + " max paral: " + maxParal);
  // execute application
  env.execute("Compute average sensor temperature");
 }

 public static class TemperatureAverager extends RichWindowFunction < SensorReading, SensorReading, String, TimeWindow > {

  /**
   * apply() is invoked once for each window.
   *
   * @param sensorId the key (sensorId) of the window
   * @param window meta data for the window
   * @param input an iterable over the collected sensor readings that were assigned to the window
   * @param out a collector to emit results from the function
   */
  @Override
  public void apply(String sensorId, TimeWindow window, Iterable < SensorReading > input, Collector < SensorReading > out) {
   System.out.println("APPLY FUNCTION START POINT");
   System.out.println("sensorId: " + sensorId + "\n");

   // compute the average temperature
   int cnt = 0;
   double sum = 0.0;
   for (SensorReading r: input) {
    System.out.println("collected item: " + r);
    cnt++;
    sum += r.temperature;
   }
   double avgTemp = sum / cnt;
   System.out.println("APPLY FUNCTION END POINT");
   System.out.println("----------------------------\n\n");
   // emit a SensorReading with the average temperature
   out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
  }
 }
}

Solution

  • Typically each slot will run one parallel instance of your pipeline. The parallelism of the job is therefore the same as the number of slots required to run it. (By using slot sharing groups you can force specific tasks into their own slots, which would then increase the number of slots required.)

    Each task (which comprises one or more operators chained together) runs in one Java thread.

    A task manager can create as many slots as you want. Typical configurations use 1 CPU core per slot, but for pipelines with heavy processing requirements you might want to have 2 or more cores per slot, and for pipelines that are mostly idle you might go in the other direction and configure several slots per core.

    All of the tasks/threads running within a task manager will simply compete for the CPU resources that the task manager can get from the machine or container that hosts it.

    All state is local to the one operator instance (task) that uses it, so all access occurs within that one thread. The one place where there hypothetically could be a race condition is between the onTimer and processElement callbacks in a ProcessFunction, but these methods are synchronized, so you don't have to worry about this. Because all state access is local, this leads to high throughput, low latency, and high scalability.

    In your example, if the parallelism is two, then you will have two slots independently executing the same logic on different slices of your data. If they are using state, then this will be key-partitioned state that is managed by Flink, which you can think of as a sharded key/value store.

    In the case of the sensor data in time windows, you don't have to be concerned at all about the multi-threading. The keyBy will partition the data so that one instance will handle all of the events and windows for some of the sensors, and the other instance (assuming there are two) will handle the rest.