We have a stream of rules that must be run on n number of records stored in a file. Given below is the pseudocode of the Flink job that I have written.
stream
.flatMap(
// 1. Reading the file which is a JSON Array and transforming it to a List<Records>
// using ObjectMapper
// 2. Iterating through the list of records and creating a Tuple2<Rule, Record> and
// adding it to the collectors.collect()
// For 1 rule if there were 10000 Records in the file after this flatMap there will be
// 10000 Tuple2<Rule, Record>
)
.rebalance()
.map(
// Doing the actual rule processing
)
.print();
I have run this as a standalone job with a job manager and one task manager. I have tried adding the parallelism of different values with different numbers of task slots in the task manager but I am getting almost similar performance.
I have tried to run the same job on different machines with different numbers of cores. When the cores are increasing the benchmark gives better results but not when the parallelism is increasing.
Can you please help me with this issue.
If you dig into this with a profiler you might be able to get a more definitive understanding of what's going on, but perhaps I can offer some insight.
You job has two tasks, connected by a rebalance. The first task is executing the flatmap, and the second task is executing the map and print operators. Each of these tasks runs in a separate thread, so when you run the job with a parallelism of 2, that results in 4 threads that are hopefully keeping 4 cores rather busy running your code. That leaves another 4 cores to handle the additional work being done by the kernel, and for Flink's overhead (e.g., running the job manager).
I would expect somewhat better performance when you increase the parallelism from 2 to 4, so I'm not sure why that isn't the case, but I'm not surprised that 8 cores aren't enough to allow this job to run well with a parallelism of 8.
I suggest you try this without the rebalance. The rebalance is quite expensive, as it's forcing an additional round of serialization and deserialization. If you remove the rebalance then each slot will only use 1 thread, and you may get better overall throughput with a parallelism of 8 than you are currently able to achieve.