Search code examples
pythonmultithreadinglistmultiprocessingpool

Multiprocessing nested for loop with counter


I am looking for simple solution which can help me use full power of my PC to process my data. I think, dividing task onto different core would help in reducing in processing time, but I donot know how to do it, I have searched on stackoverflow for the similar problem but not any solution could resolve my problem. I am processing data of around length: 3000 and since I am using nested for loop to find the number of similar(in +- 0.5 range) elements in the list, it will run 3000x3000 times which takes around 2 minutes and I want to reduce the time taken.

repeat= []
values = []
for i in completeList:
    count = 0
    for j in completeList:
        if isfloat(i) and isfloat(j):
            if float(i)-0.5 <= float(j) <= float(i)+0.5:
                count = count + 1
    repeat.append(count)
    values.append(i)

Any help would be appreciated.

with regards, Manish


Solution

  • Since you still did not post the actual code for isfloat or show what the elements of completeList look like, the best I can do is conjecture on what they might be. It makes a difference because as I mentioned, the more CPU required to execute isfloat and float to convert the elements of completeList, the greater the gains to be had by using multiprocessing.

    For CASE 1 I am assuming that completeList is composed of strings and that isfloat needs to use a regular expression to determine whether the string matches our expected floating point format and that float therefore needs to convert from a string. This would be what I would imagine to be the most CPU-intensive case. For CASE 2 completeList is composed of floats, isfloat just returns True and float does not have to do any real conversion.

    My desktop has 8 core processors:

    CASE 1

    import multiprocessing as mp
    import time
    import random
    import re
    from functools import partial
    
    def isfloat(s):
        return not re.fullmatch(r'\d*\.\d+', s) is None
    
    def single_process(complete_list):
        #repeat = []
        values = []
        for idx_i, v_i in enumerate(complete_list):
            count = 0
            for idx_j, v_j in enumerate(complete_list):
                if idx_i == idx_j:
                    continue # don't compare an element with itself
                if isfloat(v_i) and isfloat(v_j):
                    f_i = float(v_i)
                    if f_i-0.5 <= float(v_j) <= f_i+0.5:
                        count = count + 1
            # repeat will end up being a copy of complete_list
            # why are we doing this?
            #repeat.append(v_i)
            values.append(count) # these are actually counts
        return values
    
    
    def multi_worker(complete_list, index_range):
        values = []
        for idx_i in index_range:
            v_i = complete_list[idx_i]
            count = 0
            for idx_j, v_j in enumerate(complete_list):
                if idx_i == idx_j:
                    continue # don't compare an element with itself
                if isfloat(v_i) and isfloat(v_j):
                    f_i = float(v_i)
                    if f_i-0.5 <= float(v_j) <= f_i+0.5:
                        count = count + 1
            values.append(count) # these are actually counts
        return values
    
    
    def multi_process(complete_list):
    
        def split(a, n):
            k, m = divmod(len(a), n)
            return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
    
        n = len(complete_list)
        POOL_SIZE = mp.cpu_count()
        range_splits = split(range(0, n), POOL_SIZE)
        pool = mp.Pool(POOL_SIZE)
        value_lists = pool.map(partial(multi_worker, complete_list), range_splits)
        values = []
        # join results together:
        for value_list in value_lists:
            values.extend(value_list)
        return values
    
    def main():
        # generate 3000 random numbers:
        random.seed(0)
        complete_list = [str(random.uniform(1.0, 3.0)) for _ in range(3000)]
        t = time.time()
        values = single_process(complete_list)
        print(time.time() - t, values[0:10], values[-10:-1])
    
        t = time.time()
        values = multi_process(complete_list)
        print(time.time() - t, values[0:10], values[-10:-1])
    
    
    # required for Windows:
    if __name__ == '__main__':
        main()
    

    Prints:

    27.7540442943573 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
    7.187546253204346 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
    

    CASE 2

    import multiprocessing as mp
    import time
    import random
    from functools import partial
    
    def isfloat(s):
        return True
    
    def single_process(complete_list):
        values = []
        for idx_i, v_i in enumerate(complete_list):
            count = 0
            for idx_j, v_j in enumerate(complete_list):
                if idx_i == idx_j:
                    continue # don't compare an element with itself
                if isfloat(v_i) and isfloat(v_j):
                    f_i = float(v_i)
                    if f_i-0.5 <= float(v_j) <= f_i+0.5:
                        count = count + 1
            values.append(count) # these are actually counts
        return values
    
    
    def multi_worker(complete_list, index_range):
        values = []
        for idx_i in index_range:
            v_i = complete_list[idx_i]
            count = 0
            for idx_j, v_j in enumerate(complete_list):
                if idx_i == idx_j:
                    continue # don't compare an element with itself
                if isfloat(v_i) and isfloat(v_j):
                    f_i = float(v_i)
                    if f_i-0.5 <= float(v_j) <= f_i+0.5:
                        count = count + 1
            values.append(count) # these are actually counts
        return values
    
    
    def multi_process(complete_list):
    
        def split(a, n):
            k, m = divmod(len(a), n)
            return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
    
        n = len(complete_list)
        POOL_SIZE = mp.cpu_count()
        range_splits = split(range(0, n), POOL_SIZE)
        pool = mp.Pool(POOL_SIZE)
        value_lists = pool.map(partial(multi_worker, complete_list), range_splits)
        values = []
        # join results together:
        for value_list in value_lists:
            values.extend(value_list)
        return values
    
    def main():
        # generate 3000 random numbers:
        random.seed(0)
        complete_list = [random.uniform(1.0, 3.0) for _ in range(3000)]
        t = time.time()
        values = single_process(complete_list)
        print(time.time() - t, values[0:10], values[-10:-1])
    
        t = time.time()
        values = multi_process(complete_list)
        print(time.time() - t, values[0:10], values[-10:-1])
    
    
    # required for Windows:
    if __name__ == '__main__':
        main()
    

    Prints:

    4.181002378463745 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
    1.325998067855835 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
    

    Results

    For CASE 1 the speedup was 3.86, for CASE 2 the speedup was only 3.14.