Search code examples
pythonmultiprocessingpool

python - multiprocessing issues with class fields and methods


I need, in a data analysis python project, to use both classes and multiprocessing features, and I haven't found a good example of it on Google.

My basic idea - which is probably wrong - is to create a class with a high-size variable (it's a pandas dataframe in my case), and then to define a method which computes an operation (a sum in this case).

import multiprocessing
import time

class C:
    def __init__(self):
        self.__data = list(range(0, 10**7))

    def func(self, nums):
        return sum(nums)

    def start_multi(self):
        for n_procs in range(1, 4):
            print()
            time_start = time.clock()
            chunks = [self.__data[(i-1)*len(self.__data)// n_procs: (i)*len(self.__data)// n_procs] for i in range(1, n_procs+1)]
            pool = multiprocessing.Pool(processes=n_procs)
            results = pool.map_async(self.func, chunks )
            results.wait()
            pool.close()
            results = results.get()
            print(sum(results))
            print("n_procs", n_procs, "total time: ", time.clock() - time_start)

print('sum(list(range(0, 10**7)))', sum(list(range(0, 10**7))))
c = C()
c.start_multi()

The code doesn't work properly: I get the following print output

sum(list(range(0, 10**7))) 49999995000000

49999995000000
n_procs 1 total time:  0.45133500000000026

49999995000000
n_procs 2 total time:  0.8055279999999954

49999995000000
n_procs 3 total time:  1.1330870000000033

that is the computation time increases instead of decreasing. So, which is the error in this code?

But I'm also worried by the RAM usage since, when the variable chunks is created, the self.__data RAM usage is doubled. Is it possible, when dealing with multiprocessing code, and more specifically in this code, to avoid this memory waste? (I promise I'll put everything on Spark in the future :) )


Solution

  • It looks like there are a few things at play here:

    1. The chunking operation is pretty slow. On my computer the generation of the chunks was taking about 16% of the time for the cases with multiple processes. The single process, non-pool, version doesn't have that overhead.
    2. You are sending a lot of data into your processes. The chunks array is all the raw data for the ranges which needs to get pickled and sent over to the new processes. It would be much easier to, instead of sending all the raw data, just send the start and end indices.
    3. In general, if you put timers in your func you'll see that most of the time is not being spent there. That's why you aren't seeing a speedup. Most of the time is spent on the chunking, pickling, forking, and other overhead.

    As an alternative, you should try switching the chunking technique to just compute the start and end numbers and to avoid sending over so much data.

    Next, I would recommend doing something a little more computationally hard than computing the sum. For example, you can try counting primes. Here is an example where we use simple prime computing from here and we use a modified chunking technique. Otherwise, tried to keep the code the same.

    import multiprocessing
    import time
    from math import sqrt; from itertools import count, islice
    
    # credit to https://stackoverflow.com/a/27946768
    def isPrime(n):
        return n > 1 and all(n%i for i in islice(count(2), int(sqrt(n)-1)))
    
    limit = 6
    class C:
        def __init__(self):
            pass
    
        def func(self, start_end_tuple):
            start, end = start_end_tuple
            primes = []
            for x in range(start, end):
                if isPrime(x):
                    primes.append(x)
            return len(primes)
    
        def get_chunks(self, total_size, n_procs):
            # start and end value tuples
            chunks = []
    
            # Example: (10, 5) -> (2, 0) so 2 numbers per process
            # (10, 3) -> (3, 1) or here the first process does 4 and the others do 3
            quotient, remainder = divmod(total_size, n_procs)
            current_start = 0
            for i in range(0, n_procs):
                my_amount = quotient
                if i == 0:
                    # somebody needs to do extra
                    my_amount += remainder
                chunks.append((current_start, current_start + my_amount))
                current_start += my_amount
            return chunks
    
        def start_multi(self):
            for n_procs in range(1, 4):
                time_start = time.clock()
                # chunk the start and end indices instead
                chunks = self.get_chunks(10**limit, n_procs)
                pool = multiprocessing.Pool(processes=n_procs)
                results = pool.map_async(self.func, chunks)
                results.wait()
                results = results.get()
                print(sum(results))
                time_delta = time.clock() - time_start
                print("n_procs {} time {}".format(n_procs, time_delta))
    
    c = C()
    time_start = time.clock()
    print("serial func(...) = {}".format(c.func((1, 10**limit))))
    print("total time {}".format(time.clock() - time_start))
    c.start_multi()
    

    This should result in a speedup for the multiple processes. Assuming you have the cores for it.