Search code examples
pythonmultiprocessingpython-multiprocessing

Python multiprocessing for large dataset evaluation with sharing da


I've a script that I try to transform in multiprocess because I'll have a large amount of data to proceed. I tried to put together some code that I found during my research but I can't make it works with multiprocess.

The idea of the script is define like this:

  • Generate all combinations and filter it
  • Slice the combinations in chunk of x (1000) combos to be evaluated
  • Create multiple process for evaluate 1 sliced part
    • if the sum of value of the combination is greater than the current maximum, change the maximum for this value and it must be share for other process as the new maximum

There's the script:

import itertools
from timeit import default_timer as timer
from math import factorial
from multiprocessing import Process, Value, Pool, Manager, Lock

def total_combo(n, r):
    return factorial(n) // factorial(r) // factorial(n-r)

def iterator_slice(iterator, length):
    iterator = iter(iterator)
    while True:
        res = tuple(itertools.islice(iterator, length))
        if not res:
            break
        yield res


def evaluate_combination(combinations, best_combination, max_value, lock):
           
    for combination in combinations:
        #total_index = sum(people['index'] for people in combination)
        total_value = sum(people['value'] for people in combination)
        
        # Check if the combination meets the constraints
        if total_value > max_value.value:
            with lock:
                best_combination = list(combination)
                max_value.value = total_value

def generate_filtered_combinations(players,r):
    return itertools.filterfalse(lambda y: sum(x['index'] for x in y) > 30, itertools.combinations(players, r))
    
if __name__ == "__main__":
    
    start = timer()
    
    # Example usage
    players = [
        {"people":"John Doe1","index":8,"value":15},
        {"people":"John Doe2","index":7,"value":28},
        {"people":"John Doe3","index":6,"value":13},
        {"people":"John Doe4","index":7,"value":11},
        {"people":"John Smith1","index":7,"value":11},
        {"people":"John Smith2","index":6,"value":9},
        {"people":"John Smith3","index":6,"value":10},
        # .... just some data for example
    ];
    
    manager = Manager()
        
    max_value = manager.Value('i', 0)
    best_combination = manager.list()
    lock = Lock()
       
    for r in range(6,8):
        print(r, total_combo(len(players),r)) # max possible items in the combination for r with players
        combos = generate_filtered_combinations(players,r) # generate all combinations filtered
              
        pool = Pool(4)  # lets use 4 workers
        cursor_iterator = iterator_slice(combos, 1000)  # slice it
        queue = []  # a queue for our current worker async results, a deque would be faster
        
        while cursor_iterator or queue:  # while we have anything to do...
            try:
                # add our next slice to the pool:
                queue.append(pool.apply_async(evaluate_combination, [next(cursor_iterator), best_combination, max_value, lock]))
            except (StopIteration, TypeError):  # no more data, clear out the slice iterator
                cursor_iterator = None
            # wait for a free worker or until all remaining finish
            while queue and (len(queue) >= pool._processes or not cursor_iterator):
                process = queue.pop(0)  # grab a process response from the top
                if not process.ready():  # a sub-process has not finished execution
                    queue.append(process)  # add it back to the queue
                else:
                    # you can use process.get() to get the result if needed
                    pass
        pool.close()
        
    # Print the best combination
    for people in best_combination:
        print(people['people'],people['index'])
    
    print(max_value.value)

    end = timer()
    print(end - start)

For now, the max_value is never updated (neither best_combination). The with lock: seems to not working.

Thanks in advance for your input


Solution

  • I confess to having had some difficulty in following your logic, but I do see a few issues:

    1. With your players test data and your definition of generate_filtered_combinations(players,r) with argument r taking on values 6 and 7, there is no way for this function to return any combinations that match its criterion. This is why max_value, for instance, never gets updated. So I am changing r to take on values 2 and 3. Likewise, I am slicing up the combinations in chunks of size 3.
    2. Your serializing computations with lock variable needs "adjustment", i.e. the locking should be done sooner.
    3. Your statement best_combination = list(combination) does not update the managed list with the elements of combination; it is replacing what was a proxy to a managed list with a reference to a local list. Unfortunately, the proxy does not support the list.clear() method. So I had to resort to popping off all the elements one by one until the list was empty and then I called the extend method to add the new combination to the list. You might consider creating your own version of a managed list that does support the clear method.
    4. Since your are creating a separate Lock instance, there is no need to create a lock for the max_value shared variable, so I have added `lock=False for its initialization.

    Where I was most confused was the use of queue and the loop that uses it and why you are looping on r. I have greatly simplified the code (perhaps erroneously) by eliminating this queue and using method pool.imap_unordered. I also no longer pass variables best_combination, max_value, and lock on every invocation of evaluate_combination. Instead, each pool process is initialized once with global variables of these values.

    import itertools
    from timeit import default_timer as timer
    from math import factorial
    from multiprocessing import Process, Value, Pool, Manager, Lock
    
    def total_combo(n, r):
        return factorial(n) // factorial(r) // factorial(n-r)
    
    def iterator_slice(iterator, length):
        iterator = iter(iterator)
        while True:
            res = tuple(itertools.islice(iterator, length))
            if not res:
                break
            yield res
    
    def init_pool_processes(*args):
        global best_combination, max_value, lock
    
        best_combination, max_value, lock = args
    
    def evaluate_combination(combinations):
    
        for combination in combinations:
            #total_index = sum(people['index'] for people in combination)
            total_value = sum(people['value'] for people in combination)
    
            with lock: # Booboo: locking moved
                # Check if the combination meets the constraints
                if total_value > max_value.value:
                    #best_combination = list(combination) # Booboo
                    while len(best_combination):
                        best_combination.pop()
                    best_combination.extend(combination)
                    max_value.value = total_value
    
    def generate_filtered_combinations(players,r):
        return itertools.filterfalse(lambda y: sum(x['index'] for x in y) > 30, itertools.combinations(players, r))
    
    if __name__ == "__main__":
    
        start = timer()
    
        # Example usage
        players = [
            {"people":"John Doe1","index":8,"value":15},
            {"people":"John Doe2","index":7,"value":28},
            {"people":"John Doe3","index":6,"value":13},
            {"people":"John Doe4","index":7,"value":11},
            {"people":"John Smith1","index":7,"value":11},
            {"people":"John Smith2","index":6,"value":9},
            {"people":"John Smith3","index":6,"value":10},
            # .... just some data for example
        ];
    
        manager = Manager()
    
        max_value = manager.Value('i', 0, lock=False) # Booboo - don't need lock here
        best_combination = manager.list()
        lock = Lock()
    
        pool = Pool(4, initializer=init_pool_processes, initargs=(best_combination, max_value, lock))  # lets use POOL_SIZE workers
        for r in range(2, 4):
            print(r, total_combo(len(players),r)) # max possible items in the combination for r with players
            combos = generate_filtered_combinations(players,r) # generate all combinations filtered
    
            cursor_iterator = iterator_slice(combos, 3)  # slice it
            it = pool.imap_unordered(evaluate_combination, cursor_iterator)
            # If return values from evaluate_combination were required,
            # you would un-comment out the next two lines:
            #for result in it:
            #    print(result)
    
        # Wait for tasks to complete:
        pool.close()
        pool.join()
    
        # Print the best combination
        for people in best_combination:
            print(people['people'],people['index'])
    
        print(max_value.value)
    
        end = timer()
        print(end - start)
    

    Prints:

    2 21
    3 35
    John Doe1 8
    John Doe2 7
    John Doe3 6
    56
    0.30639590000000005