Search code examples
python-3.xmultithreadingparallel-processingmultiprocessingjoblib

Python multiprocessing calls become slower for later calls


I am trying to process a huge set of job-loads by calling some function on a list of arguments as:

import multiprocessing as mp 

print("Number of processors: ", mp.cpu_count())

pool = mp.Pool(mp.cpu_count()) 

try:
     results = pool.map_async(consume_one, [list-of-arguments]).get()
except:
     print(e) 
finally:
     pool.close()

And for each call of consume_one(), we pass one value from '[list-of-arguments]' and in this function I log the start and end time for the funcition consume_one().

Observed values are as:

Completed processing for ... in 0:03:34.283025
Completed processing for ... in 0:04:24.109049
Completed processing for ... in 0:04:58.464374
Completed processing for ... in 0:05:11.830404
Completed processing for ... in 0:08:32.234539
Completed processing for ... in 0:09:09.725937
Completed processing for ... in 0:09:10.968685
Completed processing for ... in 0:09:51.642501
Completed processing for ... in 0:10:58.076675
Completed processing for ... in 0:12:30.905190
Completed processing for ... in 0:14:01.051716

As we can see in the times in log that all the subsequent calls to the same function are taking longer and longer, while it is not because of argument to those calls and argument is more of less same for all of them.

My question is:

Why this might be happening?
How can I debug this?


Solution

  • Q1 :
    "Why this might be happening?"

    A1 :
    Growth of the accumulated processing inefficiencies

    Q2 :
    " How can I debug this?"

    A2 :
    Best by understanding the composition of where you lose most of the processing efficiency. First in principle, next using in practical techniques, how to reduce or better eliminate such expensive overhead hotspots at all.

    First isolate individual root causes of the processing inefficiencies :

    • costs of Python Interpreter process spawning add-on overhead costs (on some O/S-es a full, I repeat a FULL copy - i.e. could be a few [GB] of moving RAM-to-RAM physical memory-I/O bottleneck on your hardware - check (a) how many I/O channels are present for CPU-motherboard-DRAM on the way from / to a physical-RAM and (b) whether so many copied process-memory-allocations did not actually turned the O/S virtual-memory management into swapping-mode, i.e. since that scope of (processes-times-replicated)-memory did not fit into your computing platform's physical-RAM footprint, the O/S virtual-memory manager started to "emulating" a such missing RAM with serving the memory access needs "reactively" on-demand (see LRU-policy et al) by swapping large-chunks of process-reserved physical-RAM data out (once a LRU-policy says so), onto a disk (here see ~ 1E5+ larger data-access latencies and also imagine another add-on costs from traffic-jamming, if not straight blocking, any other physical-RAM to CPU data-flow, during moving there huge blocks of swapped [GB]-s to/from such a slow swap-space disk-storage. This cost is then multiplied by a factor of ~ 2X, as the costs of moving another block of data, from the slow swap-space disk-storage back into the now "freed"-RAM for a next process allowed in turn to use it (at least for some fair-amount-of-time, as the O/S virtual-memory manager thinks it being fair), is principally the same as the previous cost of slo-mo moving the block of RAM-data out (to make there a space for this one to move in). Sort of "SOKOBAN" problem, when moving all zillions of [GB]-s but all through just a pair of CPU-to-physical-RAM memory-I/O channels, so waiting so often for a 1E5+ times slower disk-storage for any next filesystem-block to { read | write } in the long long waiting queue there - it so brutal, as we speak here about the resulting end-to-end computing strategy efficiency, that it is often called RAM-thrashing or swap-juggling performance anti-pattern in efficient computing science )

    • costs of Python Interpreter process-to-process data-transfer ( SER/xfer/DES takes place for both call-signature parameters passed and also for results returned ) - this is an example, where this very performance anti-pattern could have been principally avoided, saving all add-on costs of unnecessary SER/xfer/DES data-flow of ~ 75 [TB] (! terabytes of physical-RAM memory-I/O, CPU-pickle.dumps(), O/S-pipe-xfers, CPU-pickle.loads() and another physical-RAM memory-I/O-s -- all that wasted for ~ 75 [TB] data-flow, injected pointlessly just due to wrong use of a "slim"-SLOC list-comprehension based "external"-iterator, alike the one "hidden" to operate inside the .map_async()-method used above )

    • costs of your code inefficiencies in not re-using expensively pre-fetched data in cache-lines

    • costs of hardware thermal throttling the "marketing"-promoted CPU frequencies, once the cores start to do some heavy work and get warm ( throttling can on some hardware be deferred, if a job can be shifted onto another, cooler, CPU-core - yet at a cost of losing the core-local cached data, so the code has to re-fetch it again at the most expensive price in TimeDOMAIN, just as it was moved onto a cooler, but working at a higher frequency, CPU-core ). In the situation here, your code has spawned immense amount of processes ( not mentioning the O/S-process-scheduler add-on costs now ), there are no cooler CPU-cores left and your hardware resorts to thermal throttling - i.e. working on lower and lower CPU-core frequency

    • costs of explicitly uncontrolled garbage collections are a chapter of its own importance

    For detailed argumentation on pros and cons & overhead testing templates you might like this collection of multiprocessing.Pool() & joblib.Parallel() related

    If you are interested not only in debugging the problem, but also in solving the end-to-end process-performance, try to re-factor the code so as to principally prevent this from happening - if you are sure you want that many processes (while these are most probably memory-starved, waiting for not yet fetched data from slow & overloaded/blocked physical-RAM-I/O channels), be sure to move from a __main__-Python Interpreter hosted "outer"-item-iterator to a block-based work. There you command the worker processes to iterate "internally" (avoiding all the awfully many repetitions of SER/xfer/DES add-on costs) over their commanded, disjunct block-of-list, partitioned by the __main__-Python Interpreter. As the Python Interpreter process-instantiation works in the known manner (copying a state-full copy of the __main__-Python Interpreter - previous attempts to reduce the amount of data copied by O/S-provided "forking" have been documented to cause problems or can even lead to deadlocking situations, so a due care is to be paid here, if the code has to be robust enough, if it is going into production or providing some kind of critical intelligence), the list, per-se will be already present in the worker processes, that can iterate over their "private"-part (one of the disjunct blocks of this list) just by commanded index-ranges, i.e. without expensively passing any other parameter to the .Pool()-processes, but the respective index-range ~ as trivial as ( startHere, stopHere )-tuples, that map / cover the whole list. The costs of returning results depend on what these consist of. Tools exist for doing this efficiently, be it in block-transfer between processes on list-block completed, or in compressed file-I/O storage. Details matter, as always, if performance is the goal. As the items take ~ 3+ minutes in the as-is state of the consume_one() processing, there is plenty of opportunities to speed this up, in the name of the overall processing efficiency and performance.

    Block-based "internal"-iteration can keep calling the as-is consume_one(), if that could not get faster by some performance improving tools - be it numpy-vectorised (using internally high performance multicore, cache re-use efficient bare-metal optimised libraries) or numba-JIT-compiled accelerated processing (as the re-use count of the JIT/LLVM-compiled code here works in your direction, towards improving the overall performance )