I need, in a data analysis python project, to use both classes and multiprocessing features, and I haven't found a good example of it on Google.
My basic idea - which is probably wrong - is to create a class with a high-size variable (it's a pandas dataframe in my case), and then to define a method which computes an operation (a sum in this case).
import multiprocessing
import time
class C:
def __init__(self):
self.__data = list(range(0, 10**7))
def func(self, nums):
return sum(nums)
def start_multi(self):
for n_procs in range(1, 4):
print()
time_start = time.clock()
chunks = [self.__data[(i-1)*len(self.__data)// n_procs: (i)*len(self.__data)// n_procs] for i in range(1, n_procs+1)]
pool = multiprocessing.Pool(processes=n_procs)
results = pool.map_async(self.func, chunks )
results.wait()
pool.close()
results = results.get()
print(sum(results))
print("n_procs", n_procs, "total time: ", time.clock() - time_start)
print('sum(list(range(0, 10**7)))', sum(list(range(0, 10**7))))
c = C()
c.start_multi()
The code doesn't work properly: I get the following print output
sum(list(range(0, 10**7))) 49999995000000
49999995000000
n_procs 1 total time: 0.45133500000000026
49999995000000
n_procs 2 total time: 0.8055279999999954
49999995000000
n_procs 3 total time: 1.1330870000000033
that is the computation time increases instead of decreasing. So, which is the error in this code?
But I'm also worried by the RAM usage since, when the variable chunks is created, the self.__data RAM usage is doubled. Is it possible, when dealing with multiprocessing code, and more specifically in this code, to avoid this memory waste? (I promise I'll put everything on Spark in the future :) )
It looks like there are a few things at play here:
chunks
was taking about 16% of the time for the cases with multiple processes. The single process, non-pool, version doesn't have that overhead.chunks
array is all the raw data for the ranges which needs to get pickled
and sent over to the new processes. It would be much easier to, instead of sending all the raw data, just send the start and end indices.func
you'll see that most of the time is not being spent there. That's why you aren't seeing a speedup. Most of the time is spent on the chunking, pickling, forking, and other overhead.As an alternative, you should try switching the chunking technique to just compute the start and end numbers and to avoid sending over so much data.
Next, I would recommend doing something a little more computationally hard than computing the sum. For example, you can try counting primes. Here is an example where we use simple prime computing from here and we use a modified chunking technique. Otherwise, tried to keep the code the same.
import multiprocessing
import time
from math import sqrt; from itertools import count, islice
# credit to https://stackoverflow.com/a/27946768
def isPrime(n):
return n > 1 and all(n%i for i in islice(count(2), int(sqrt(n)-1)))
limit = 6
class C:
def __init__(self):
pass
def func(self, start_end_tuple):
start, end = start_end_tuple
primes = []
for x in range(start, end):
if isPrime(x):
primes.append(x)
return len(primes)
def get_chunks(self, total_size, n_procs):
# start and end value tuples
chunks = []
# Example: (10, 5) -> (2, 0) so 2 numbers per process
# (10, 3) -> (3, 1) or here the first process does 4 and the others do 3
quotient, remainder = divmod(total_size, n_procs)
current_start = 0
for i in range(0, n_procs):
my_amount = quotient
if i == 0:
# somebody needs to do extra
my_amount += remainder
chunks.append((current_start, current_start + my_amount))
current_start += my_amount
return chunks
def start_multi(self):
for n_procs in range(1, 4):
time_start = time.clock()
# chunk the start and end indices instead
chunks = self.get_chunks(10**limit, n_procs)
pool = multiprocessing.Pool(processes=n_procs)
results = pool.map_async(self.func, chunks)
results.wait()
results = results.get()
print(sum(results))
time_delta = time.clock() - time_start
print("n_procs {} time {}".format(n_procs, time_delta))
c = C()
time_start = time.clock()
print("serial func(...) = {}".format(c.func((1, 10**limit))))
print("total time {}".format(time.clock() - time_start))
c.start_multi()
This should result in a speedup for the multiple processes. Assuming you have the cores for it.