I've a script that I try to transform in multiprocess because I'll have a large amount of data to proceed. I tried to put together some code that I found during my research but I can't make it works with multiprocess.
The idea of the script is define like this:
There's the script:
import itertools
from timeit import default_timer as timer
from math import factorial
from multiprocessing import Process, Value, Pool, Manager, Lock
def total_combo(n, r):
return factorial(n) // factorial(r) // factorial(n-r)
def iterator_slice(iterator, length):
iterator = iter(iterator)
while True:
res = tuple(itertools.islice(iterator, length))
if not res:
break
yield res
def evaluate_combination(combinations, best_combination, max_value, lock):
for combination in combinations:
#total_index = sum(people['index'] for people in combination)
total_value = sum(people['value'] for people in combination)
# Check if the combination meets the constraints
if total_value > max_value.value:
with lock:
best_combination = list(combination)
max_value.value = total_value
def generate_filtered_combinations(players,r):
return itertools.filterfalse(lambda y: sum(x['index'] for x in y) > 30, itertools.combinations(players, r))
if __name__ == "__main__":
start = timer()
# Example usage
players = [
{"people":"John Doe1","index":8,"value":15},
{"people":"John Doe2","index":7,"value":28},
{"people":"John Doe3","index":6,"value":13},
{"people":"John Doe4","index":7,"value":11},
{"people":"John Smith1","index":7,"value":11},
{"people":"John Smith2","index":6,"value":9},
{"people":"John Smith3","index":6,"value":10},
# .... just some data for example
];
manager = Manager()
max_value = manager.Value('i', 0)
best_combination = manager.list()
lock = Lock()
for r in range(6,8):
print(r, total_combo(len(players),r)) # max possible items in the combination for r with players
combos = generate_filtered_combinations(players,r) # generate all combinations filtered
pool = Pool(4) # lets use 4 workers
cursor_iterator = iterator_slice(combos, 1000) # slice it
queue = [] # a queue for our current worker async results, a deque would be faster
while cursor_iterator or queue: # while we have anything to do...
try:
# add our next slice to the pool:
queue.append(pool.apply_async(evaluate_combination, [next(cursor_iterator), best_combination, max_value, lock]))
except (StopIteration, TypeError): # no more data, clear out the slice iterator
cursor_iterator = None
# wait for a free worker or until all remaining finish
while queue and (len(queue) >= pool._processes or not cursor_iterator):
process = queue.pop(0) # grab a process response from the top
if not process.ready(): # a sub-process has not finished execution
queue.append(process) # add it back to the queue
else:
# you can use process.get() to get the result if needed
pass
pool.close()
# Print the best combination
for people in best_combination:
print(people['people'],people['index'])
print(max_value.value)
end = timer()
print(end - start)
For now, the max_value
is never updated (neither best_combination
). The with lock:
seems to not working.
Thanks in advance for your input
I confess to having had some difficulty in following your logic, but I do see a few issues:
players
test data and your definition of generate_filtered_combinations(players,r)
with argument r taking on values 6 and 7, there is no way for this function to return any combinations that match its criterion. This is why max_value
, for instance, never gets updated. So I am changing r to take on values 2 and 3. Likewise, I am slicing up the combinations in chunks of size 3.lock
variable needs "adjustment", i.e. the locking should be done sooner.best_combination = list(combination)
does not update the managed list with the elements of combination; it is replacing what was a proxy to a managed list with a reference to a local list. Unfortunately, the proxy does not support the list.clear()
method. So I had to resort to popping off all the elements one by one until the list was empty and then I called the extend
method to add the new combination to the list. You might consider creating your own version of a managed list that does support the clear
method.Lock
instance, there is no need to create a lock for the max_value
shared variable, so I have added `lock=False for its initialization.Where I was most confused was the use of queue and the loop that uses it and why you are looping on r
. I have greatly simplified the code (perhaps erroneously) by eliminating this queue
and using method pool.imap_unordered
. I also no longer pass variables best_combination
, max_value
, and lock
on every invocation of evaluate_combination
. Instead, each pool process is initialized once with global variables of these values.
import itertools
from timeit import default_timer as timer
from math import factorial
from multiprocessing import Process, Value, Pool, Manager, Lock
def total_combo(n, r):
return factorial(n) // factorial(r) // factorial(n-r)
def iterator_slice(iterator, length):
iterator = iter(iterator)
while True:
res = tuple(itertools.islice(iterator, length))
if not res:
break
yield res
def init_pool_processes(*args):
global best_combination, max_value, lock
best_combination, max_value, lock = args
def evaluate_combination(combinations):
for combination in combinations:
#total_index = sum(people['index'] for people in combination)
total_value = sum(people['value'] for people in combination)
with lock: # Booboo: locking moved
# Check if the combination meets the constraints
if total_value > max_value.value:
#best_combination = list(combination) # Booboo
while len(best_combination):
best_combination.pop()
best_combination.extend(combination)
max_value.value = total_value
def generate_filtered_combinations(players,r):
return itertools.filterfalse(lambda y: sum(x['index'] for x in y) > 30, itertools.combinations(players, r))
if __name__ == "__main__":
start = timer()
# Example usage
players = [
{"people":"John Doe1","index":8,"value":15},
{"people":"John Doe2","index":7,"value":28},
{"people":"John Doe3","index":6,"value":13},
{"people":"John Doe4","index":7,"value":11},
{"people":"John Smith1","index":7,"value":11},
{"people":"John Smith2","index":6,"value":9},
{"people":"John Smith3","index":6,"value":10},
# .... just some data for example
];
manager = Manager()
max_value = manager.Value('i', 0, lock=False) # Booboo - don't need lock here
best_combination = manager.list()
lock = Lock()
pool = Pool(4, initializer=init_pool_processes, initargs=(best_combination, max_value, lock)) # lets use POOL_SIZE workers
for r in range(2, 4):
print(r, total_combo(len(players),r)) # max possible items in the combination for r with players
combos = generate_filtered_combinations(players,r) # generate all combinations filtered
cursor_iterator = iterator_slice(combos, 3) # slice it
it = pool.imap_unordered(evaluate_combination, cursor_iterator)
# If return values from evaluate_combination were required,
# you would un-comment out the next two lines:
#for result in it:
# print(result)
# Wait for tasks to complete:
pool.close()
pool.join()
# Print the best combination
for people in best_combination:
print(people['people'],people['index'])
print(max_value.value)
end = timer()
print(end - start)
Prints:
2 21
3 35
John Doe1 8
John Doe2 7
John Doe3 6
56
0.30639590000000005