Search code examples
resourcesapache-flinkdistributed-computingdistributed-system

Relationship between number of subtasks in Flink and resource usage


I have the following workflow in flink running on a cluster of 3 machines with 4 cores each on GCP.

HDFS-Scan -> Filter -> Aggregate

I set the parallelism of these operators to 12 initially, so that each operator can have 12 subtasks (I disabled chaining). I am trying to study the effect of the number of subtasks on the resource usage. The following is what I did:

  1. Run 1: I made the filter operation logic expensive so that it invokes backpressure. The total execution time was 212 seconds.

  2. Run 2: I kept the filter operator expensive. Since the scan operator was being backpressured anyways, I reduced its parallelism all the way to 4. Having less parallelism meant that scan produced data slower, but Filter was still the bottleneck. The execution time was still around 212 seconds.

  3. Run 3: I kept the filter operator expensive. I reduced scan operator's parallelism to 2. At this point, scan became the bottleneck and the execution time increased.

My question is about Run 2. I was expecting that reducing the parallelism of scan would have an effect on the CPU usage of the VMs. I expected either of two cases - 1) the CPU usage should go down because CPU would be more free, or 2) Filter subtasks take up the CPU freed by the scan's subtasks. In this case CPU usage wouldn't go down, but the execution time should. But neither of these happened.

Can someone help me understand this? Is there some other way to reason about what is happening?


Solution

  • I would expect the aggregate CPU effort expended across the collection of HDFS-Scan subtasks to be roughly the same in both Run 1 and Run 2. Whether there are 4 or 12 subtasks for this HDFS-Scan operator doesn't make much difference, since they are spending most of their time blocked, doing nothing while waiting for Flink's credit-based flow control to allocate buffers for them to work with.

    Just to make up some numbers, perhaps with 12 instances they are each blocked 75% of the time, while with 4 they are each blocked 25% of the time. While there is somewhat more overhead to have 12 compared to 4, the overall performance is probably dominated by ser/de plus whatever the filter is doing.

    A (sub)task can be in one of three states:

    • idle, meaning it has nothing to do
    • backpressured, meaning it can not do anything because it has no available output buffers
    • busy, meaning it is actively processing events

    A task (an instance of an operator chain) corresponds to a JVM thread. All of the tasks in all of the slots in a single task manager are competing with each other for the resources (CPU, memory, etc) available to that task manager's JVM.

    While idle or backpressured, a task isn't consuming any (significant amount of) CPU time. Because there is a small, fixed amount of buffering in any Flink pipeline, any backpressure quickly propagates upstream and ends up throttling the sources.

    So in your case, whether there are 12 source tasks that are mostly all doing nothing while backpressured, or 4 that are kind of busy, collectively those source tasks are producing the same volume of events (however many the downstream bottleneck can handle) and expending (approximately) the same amount of CPU in aggregate to get that done.