Search code examples
pythonparallel-processingmultiprocessing

How do I apply multiprocessing to a function in the inner loop of a nested loop while still running code in the outer loop?


I'm trying to implement the algorithm from this paper, seen below.

import gzip
import numpy as np

k = 16
training_set = np.array([('TCCCTACACT', 9),
          ('AGTTGGTATT', 12),
          ('AGTGGATCAC', 8),
          ('CGCAAGTGTG', 3),
          ('CTGTTCCCCC', 7),
          ('CGACTGGTAA', 10),
          ('CGCCGAGAAG', 4),
          ('TAGCTACGAC', 5),
          ('CCTTGCGCGT', 11),
          ('CCAAAAGAAA', 12),
          ('CCTCAGGAGG', 6),
          ('AGGCCACTTA', 5),
          ('GCGGGAACGG', 5),
          ('CTATTACCAA', 2),
          ('ACACTTTTTT', 8),
          ('GATGCAGCGT', 1)])
test_set= np.array([('GATGCAGCGT', 3),
           ('GATGCAGCGT', 0),
           ('GATGCAGCGT', 3),
           ('GATGCAGCGT', 4)])
for ( x1, _ ) in test_set:
    Cx1 = len( gzip . compress ( x1.encode() ) )
    distance_from_x1 = []
    for ( x2, _ ) in training_set:
        Cx2 = len( gzip.compress(x2.encode()))
        x1x2 = " ".join([ x1 , x2 ])
        Cx1x2 = len( gzip.compress ( x1x2.encode() ))
        ncd = ( Cx1x2 - min( Cx1 , Cx2 ) ) / max(Cx1 , Cx2 )
        distance_from_x1.append( ncd )
    sorted_idx = np.argsort ( np.array(distance_from_x1 ) )

    top_k_class = list(training_set [ sorted_idx[: k] , 1])
    predict_class = max(set( top_k_class ) ,key = top_k_class.count )
    print(predict_class)

However, as the computation time explodes with the length of the dataset, I'm trying to implement multiprocessing. My intuition says that multiprocessing the inner loop would be the biggest time saver. I've seen answers to other questions on how to do multiprocessing on nested loops, but none with the caveat that other code still has to be run outside the inner loop but within the outer loop.

I've tried this code:

import gzip
import numpy as np
from multiprocessing import Pool, cpu_count
def calculate_ncd(x1, x2):
    Cx1 = len(gzip.compress(x1.encode()))
    Cx2 = len(gzip.compress(x2.encode()))
    x1x2 = " ".join([x1, x2])
    Cx1x2 = len(gzip.compress(x1x2.encode()))
    ncd = (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)
    return ncd

def process_train_sample(x1, x2):
    return calculate_ncd(x1, x2)

def process_test_sample(test_sample):
    x1, _ = test_sample
    pool = Pool(cpu_count())
    distance_from_x1 = pool.starmap(process_train_sample, [([x1] * len(training_set), training_set)])
    pool.close()
    pool.join()
    sorted_idx = np.argsort(np.array(distance_from_x1))
    top_k_class = list(training_set[sorted_idx[:k], 1])
    predict_class = max(set(top_k_class), key=top_k_class.count)
    return predict_class

k = 16
training_set = np.array([('TCCCTACACT', 9),
          ('AGTTGGTATT', 12),
          ('AGTGGATCAC', 8),
          ('CGCAAGTGTG', 3),
          ('CTGTTCCCCC', 7),
          ('CGACTGGTAA', 10),
          ('CGCCGAGAAG', 4),
          ('TAGCTACGAC', 5),
          ('CCTTGCGCGT', 11),
          ('CCAAAAGAAA', 12),
          ('CCTCAGGAGG', 6),
          ('AGGCCACTTA', 5),
          ('GCGGGAACGG', 5),
          ('CTATTACCAA', 2),
          ('ACACTTTTTT', 8),
          ('GATGCAGCGT', 1)])
test_set= np.array([('GATGCAGCGT', 3),
           ('GATGCAGCGT', 0),
           ('GATGCAGCGT', 3),
           ('GATGCAGCGT', 4)])

results = [process_test_sample(sample) for sample in test_set]# Process test samples in parallel


print(results)  # Print the predicted classes

But from what I've seen managing the pool overhead in each iteration of the outer loop is inefficient and the computation time increases.

I would like to know how to use a pool for each turn of the inner loop without having to manage the overhead of the pool on each turn of the outer loop.

EDIT: I've added dummy data since the actual data is hundreds of thousands of nucleotides long.


Solution

  • You can try to leverage the shared memory (to not copy the values back and forth):

    import gzip
    import numpy as np
    
    from multiprocessing import Pool
    from multiprocessing.managers import SharedMemoryManager
    from multiprocessing.shared_memory import ShareableList
    
    
    def do_work(args):
        k = 16
    
        sl1, sl2, sl3, (x1, _) = args
        training_set = ShareableList(name=sl1.shm.name)
        training_set_lengths = ShareableList(name=sl2.shm.name)
        training_set_values = ShareableList(name=sl3.shm.name)
    
        Cx1 = len(gzip.compress(x1.encode()))
    
        distance_from_x1 = []
        for x2, Cx2 in zip(training_set, training_set_lengths):
            x1x2 = " ".join([x1, x2])
            Cx1x2 = len(gzip.compress(x1x2.encode()))
            ncd = (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)
            distance_from_x1.append(ncd)
    
        sorted_idx = np.argsort(np.array(distance_from_x1))
        top_k_class = [training_set_values[idx] for idx in sorted_idx[:k]]
        predict_class = max(set(top_k_class), key=top_k_class.count)
        return predict_class
    
    
    if __name__ == '__main__':
        k = 16
        training_set = [
                ("TCCCTACACT", 9),
                ("AGTTGGTATT", 12),
                ("AGTGGATCAC", 8),
                ("CGCAAGTGTG", 3),
                ("CTGTTCCCCC", 7),
                ("CGACTGGTAA", 10),
                ("CGCCGAGAAG", 4),
                ("TAGCTACGAC", 5),
                ("CCTTGCGCGT", 11),
                ("CCAAAAGAAA", 12),
                ("CCTCAGGAGG", 6),
                ("AGGCCACTTA", 5),
                ("GCGGGAACGG", 5),
                ("CTATTACCAA", 2),
                ("ACACTTTTTT", 8),
                ("GATGCAGCGT", 1),
            ] # * 50_000
    
        test_set = np.array(
            [("GATGCAGCGT", 3), ("GATGCAGCGT", 0), ("GATGCAGCGT", 3), ("GATGCAGCGT", 4)] # * 10
        )
    
        with SharedMemoryManager() as smm, Pool() as pool:
            training_set_sl = smm.ShareableList([str(x2) for x2, _ in training_set])
            training_set_values = smm.ShareableList([int(val) for _, val in training_set])
            training_set_comp_lengths = smm.ShareableList([len(gzip.compress(x2.encode())) for x2, _ in training_set])
    
            for i, result in enumerate(pool.imap(do_work, ((training_set_sl, training_set_comp_lengths, training_set_values, t) for t in test_set), chunksize=1)):
                print(i, result)
    

    With training_set of length 800_000 and test_set of length 40 I've managed to run it under one minute (58,7s on my 8C/16T 5700X, Ubuntu 20.04, Python 3.10.9).