Search code examples
apache-flinkflink-streamingflink-batch

Flink Operator stuck at 100% busy, how do I get it down?


I've deployed a Flink cluster as a yarn application. As part of the yarn configuration I associated 32 vCores to each Task Manager. I also allocated 2 slots to each Task Manager.

Job Pipeline: Kafka source > Filter > RichMapFunction > TumblingWindow (Processing Time) > Sink

Job Graph

I've set the Parallelism and Max parallelism to 8 and ensured my keyBy function evenly distributes the messages to each subtask, as shown in the image below:

Messages distribution to Subtasks

Below is a visualisation of how I imagine the parallel instances of the operators are being placed. Based on what I read from the flink docs here.

Question 1: Is this correct? And secondly will each circle be executed on a separate thread?

enter image description here

The TaskManagers CPU utilisation barely reaches 7%, so it's clearly not using all the available cores.

Question 2: In order to increase the CPU usage of a single TaskManager should I increase the number of slots per TaskManger?

Question 3: Would increasing the parallelism eventually result in the Busy metric coming down?

My final question is around my use of the Aggregator Function in my Tumbling Processing Time Window. It has a hash map that stores the latest values for messages based on their ids, where the key is the id and the value the message. The output is then passed onto a RichAsyncFunc that sinks these values to a DB. The window length is 200ms, this allows me to only send 4 DB queries per second for a given sink instance, as oppose for every message which would be far too many.

Question 4: Is this the right type of function for this job? Or would there be a more performant approach to use.

Thanks for reading!


Solution

  • Question 1: Is this correct? And secondly will each circle be executed on a separate thread?

    No, wherever there's a forwarding connection, those operators (circles) will run in the same thread (unless you disable operator chaining). So the source and filter are sharing a thread, and the window and sink are sharing a thread.

    Question 2: In order to increase the CPU usage of a single TaskManager should I increase the number of slots per TaskManger?

    Yes, you could do that as a way of increasing the parallelism without adding more hardware.

    Question 3: Would increasing the parallelism eventually result in the Busy metric coming down?

    Maybe. Depends on what's causing the bottleneck. It could also make things worse.

    Question 4: Is this the right type of function for this job? Or would there be a more performant approach to use.

    I suspect the RichAsyncFunc is the cause of your problems. Is there not a real sink available for your DB? In general, making fewer writes (with larger batches) to the sink is the answer.

    Also, do you really need use keyBy twice, once between filter and map, and again between map and window? Those are expensive.