Search code examples
pythonmultithreadingmultiprocessingpython-multiprocessingreduce

Multiprocessed reduce function hangs when task queue size of above 1200 elements in Python


the program works fine untill the size of the to_sum list reaches about 1150. Afterwards, the processes will hang at the first point where task_queue = result_queue. They will successfully fill the result queue and terminate, but then will hang. This problem does not happen if the size of the array is below 1150. Restarting the computer sometimes allows for the program to work with an array of bigger size before it hangs due to size, but it is always around the 1100-1300 range. Do you have any idea what could cause this problem?

    import multiprocessing

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.chunks = chunks
        self.lock = lock

    def run(self):
        while True:
            """
            Using a lock to avoid a race condition where 2 threads both get a number, then
            both to get another, but it is None so they both put the number back resulting in a result queue
            with bigger size. For example:
            Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
            achieving 500.
            With race condition result is 501.
            [1, 2, None, None]
            Thread 1 gets 1.
            Thread 2 gets 2.
            Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
            Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
            A lock on both gets removes the race condition.
            """
            with self.lock:
                if not self.task_queue.empty():
                    number_1 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_1 is None:
                        #Poison pill - terminate.
                        print(f"Terminated {self.name}")
                        return
                else:
                    #Queue empty - terminate.
                    return
                if not self.task_queue.empty():
                    number_2 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_2 is None:
                        #Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
                        #acquired.
                        self.result_queue.put(number_1)
                        print(f"Terminated {self.name}")
                        return
                else:
                    self.result_queue.put(number_1)
                    #Queue empty, put the 1 number in result queue and terminate.
                    return
            self.result_queue.put(number_1 + number_2)


def multiprocess_sum(array):
    if len(array) == 1:
        return array[0]
    lock = multiprocessing.Lock()
    task_queue = multiprocessing.JoinableQueue()
    [task_queue.put(element) for element in to_sum]
    task_queue_size = len(array)

    while task_queue_size > 1:
        print(task_queue.qsize(), task_queue_size)
        result_queue = multiprocessing.JoinableQueue()
        processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
                    for i in range(8)]
        [task_queue.put(None) for process in processes]
        [process.start() for process in processes]
        #[process.join() for process in processes]
        task_queue.join()
        task_queue = result_queue
        task_queue_size = task_queue_size // 2 + task_queue_size % 2
    return result_queue.get()

if __name__ == "__main__":
    to_sum = [i for i in range(1350)]
    """
    If range is below 1200, the program will run and compute everything correctly.
    If it is above it, it will hang at the first halving, the moment the first task_queue is empty and the
    result_queue becomes the new task_queue. 
    Computer restart will make the range values fluctuate, yesterday it would hang at 1177 but run fine up to 1776.
    Queue pipe full???
    """
    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

Solution

  • This is too long for a comment, and so:

    First and foremost, calls to empty and qsize on multiprocessing queues are unreliable and should not be used (read the docs). Second, the lock you are using prevents any real multiprocessing from occurring since the bulk of processing done in your run method is being executed serially. Third, to add up 1359 numbers requires 1349 additions (how can it be otherwise?). So simply divide your 1350 numbers into 8 lists as evenly as possible and have each of your processes sum their lists and return the results. Then just add the 8 returned values to get the final result.

    As the size of your to_sum grows the more negligible is the contribution to the total running time made by the final addition of the 8 partial sums.

    import multiprocessing
    from functools import reduce
    from operator import add
    
    class CustomProcess(multiprocessing.Process):
    
        def __init__(self, name, task_queue, result_queue, *args, **kwargs):
            super().__init__(name=name, *args, **kwargs)
            self.task_queue = task_queue
            self.result_queue = result_queue
    
        def run(self):
            self.result_queue.put(reduce(add, self.task_queue.get(), 0))
    
    def split(iterable, n):  # function to split iterable in n even parts
        if type(iterable) is range and iterable.step != 1:
            # algorithm doesn't work with steps other than 1:
            iterable = list(iterable)
        l = len(iterable)
        n = min(l, n)
        k, m = divmod(l, n)
        return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
    
    N_PROCESSES = 8
    
    def multiprocess_sum(l):
        if len(l) == 1:
            return l[0]
    
        task_queue = multiprocessing.Queue()
        lists = split(l, N_PROCESSES)
        for l in lists:
            task_queue.put(l)
    
        result_queue = multiprocessing.Queue()
    
        processes = [
            CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue)
            for i in range(N_PROCESSES)
        ]
        for process in processes:
            process.start()
    
        the_sum = reduce(add, (result_queue.get() for _ in range(N_PROCESSES)), 0)
    
        for process in processes:
            process.join()
    
        return the_sum
    
    if __name__ == "__main__":
        to_sum = list(range(1350))
        print(sum(to_sum))
        for i in range(5):
            print(multiprocess_sum(to_sum))
    

    Prints:

    910575
    910575
    910575
    910575
    910575
    910575
    

    I wonder how large the to_sum list must be to gain any time savings by using multiprocessing for this particular problem.

    Update

    To do the reduction per the OP's attempt, I would use the following code. Since calls to qsize on a queue are not reliable, we keep track of the number of items on a queue using a sharable integer Value that must be incremented or decremented under control of a lock. We also create the processes once. Read the comments, please.

    import multiprocessing
    
    class CustomProcess(multiprocessing.Process):
    
        def __init__(self,
                     name,
                     task_queue,
                     result_queue,
                     task_queue_size,
                     result_queue_size,
                     condition,
                     *args,
                     **kwargs
                     ):
            super().__init__(name=name, *args, **kwargs)
            self.name = name
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.task_queue_size = task_queue_size
            self.result_queue_size = result_queue_size
            self.condition = condition
    
        def run(self):
            task_queue = self.task_queue
            result_queue = self.result_queue
            task_queue_size = self.task_queue_size
            result_queue_size = self.result_queue_size
            condition = self.condition
    
            task_queue_size_lock = task_queue_size.get_lock()
    
            while True:
                # When the task queue size goes down to zero, the main process
                # will move all the items from the result queue to the
                # task queue and then set the new task queue size.
                # We must not attempt to process the task queue while
                # this occurs, i.e. we must wait until the task queue size
                # is again non-zero:
                with condition:
                    while task_queue_size.value == 0:
                        condition.wait()
    
                # No need to acquire lock for this test:
                if task_queue_size.value < 0:
                    return # We are done
    
                # There is no gurantee we will find anything on the input queue:
                task_queue_size_lock.acquire()
                if task_queue_size.value == 0:
                    task_queue_size_lock.release()
                    continue
    
                number_1 = task_queue.get()
                task_queue_size.value -= 1
                if task_queue_size.value == 0:
                    # put number on result_queue:
                    task_queue_size_lock.release()
                    result_queue.put(number_1)
                    with result_queue_size.get_lock():
                        result_queue_size.value += 1
                    task_queue.task_done()
                else:
                    number_2 = task_queue.get()
                    task_queue_size.value -= 1
                    task_queue_size_lock.release()
                    # No lock is held for the actual reduction operation:
                    result_queue.put(number_1 + number_2)
                    with result_queue_size.get_lock():
                        result_queue_size.value += 1
                    # Since we have tasken off 2 elements from the task queue:
                    task_queue.task_done()
                    task_queue.task_done()
    
    def multiprocess_sum(array):
        n = len(array)
        if n == 1:
            return array[0]
    
        task_queue = multiprocessing.JoinableQueue()
        # You should be iterating array, not to_sun and
        # using a comprehension for its side effect is not Pythonic:
        for element in array:
            task_queue.put(element)
        task_queue_size = multiprocessing.Value('i', n)
    
        result_queue = multiprocessing.Queue()
        result_queue_size = multiprocessing.Value('i', 0)
    
        condition = multiprocessing.Condition()
    
        processes = [
            CustomProcess(name=str(i),
                          task_queue=task_queue,
                          result_queue=result_queue,
                          task_queue_size=task_queue_size,
                          result_queue_size=result_queue_size,
                          condition=condition,
                          )
            for i in range(8)
        ]
        for process in processes:
            process.start()
    
        while True:
            print('n =', n)
    
            # Wait for task_queue to be emptied:
            task_queue.join()
    
            # Now we can be sure that the child processes are no longer retrieving from the task queue
            # and putting items on the result queue:
    
            n = result_queue_size.value
            if n == 1:
                print('n =', n)
                result = result_queue.get()
                # Tell child processes to terminate:
                with condition:
                    task_queue_size.value = -1
                    condition.notify_all()
                return result
    
            # Child processes get their input from task queue.
            # So we must get items from the result queue and move them to the task queue.
            # The task queue size is now 0 so this should pause the child processes.
            for _ in range(n):
                task_queue.put(result_queue.get())
            result_queue_size.value = 0
            # Allow children to run
            with condition:
                # The new n value will be half of the previous n value (more or less)
                task_queue_size.value = n
                condition.notify_all()
    
    if __name__ == "__main__":
        to_sum = [i for i in range(1350)]
        print(sum(to_sum))
    
    
        for i in range(5):
            print(multiprocess_sum(to_sum))
    

    Prints:

    910575
    task queue size: 1350
    task queue size: 675
    task queue size: 338
    task queue size: 169
    task queue size: 85
    task queue size: 43
    task queue size: 22
    task queue size: 11
    task queue size: 6
    task queue size: 3
    task queue size: 2
    task queue size: 1
    910575
    task queue size: 1350
    task queue size: 675
    task queue size: 338
    task queue size: 169
    task queue size: 85
    task queue size: 43
    task queue size: 22
    task queue size: 11
    task queue size: 6
    task queue size: 3
    task queue size: 2
    task queue size: 1
    910575
    task queue size: 1350
    task queue size: 675
    etc.
    

    Update 2

    But simpler yet is to use a multiprocessing pool and to pass to the worker function pairs of items to be reduced:

    import multiprocessing
    from itertools import chain
    
    def custom_process(pair):
        return pair[0] + pair[1] if len(pair) == 2 else pair[0]
    
    def multiprocess_sum(array):
        n = len(array)
        if n == 1:
            return array[0]
    
        results = array
        pool = multiprocessing.Pool(8)
        while True:
            print('n =', n)
            if n == 1:
                break
    
            # Create pairs:
            pairs = zip(results[:n//2], results[n//2:])
            # Did we start with an odd number of elements?
            if n % 2 == 1:
                pairs = chain(pairs, ((results[-1],),))
            # specify a chunksize for improved performance:
            results = list(pool.imap_unordered(custom_process, pairs))
            n = len(results)
        result = results[0]
        pool.close()
        pool.join()
        return result
    
    if __name__ == "__main__":
        to_sum = [i for i in range(1350)]
        print(sum(to_sum))
    
    
        for i in range(5):
            print(multiprocess_sum(to_sum))
    

    Prints:

    910575
    n = 1350
    n = 675
    n = 338
    n = 169
    n = 85
    n = 43
    n = 22
    n = 11
    n = 6
    n = 3
    n = 2
    n = 1
    910575
    n = 1350
    n = 675