Search code examples
pythonclassmultiprocessinggenetic-algorithmunbound

Genetic algorithms -- Using multiprocessing inside of a class having an unbound method


I have been trying of late to parallelize some of my code (for speed) by resorting to the multiprocessing library inside of a class itself making use of an unbound method (basically, it is a user-supplied function held inside of a class attribute). It doesn't work at all.

Context: I'm trying to parallelize a "parallel genetic algorithms" class which, as the name strongly implies, is in itself an embarassingly parallel problem.

As far as I can say, there are 2 problems to be found within my code. (1) The user-supplied fitness function is not exported over the processes generated by the Pool object, and no amount of deep copying seems able to resolve it. (2) The other issue is that, perhaps, the Pool object is unsure about how to handle multi-dimensional outputs... I'm really unsure about this one tbh.

I've tried to work out a tiny, standalone version of my code just so to make things clearer (it doesn't currently run, because of the bugs, which is kind of the point):

from itertools import repeat
import multiprocessing as mp
import numpy as np

class GeneticAlgorithm:
    def __init__(self, I, G, D, U, fitness_function, run_parallel):
        self.fitness_function = fitness_function # User-supplied fitness function
        self.D = D # Problem size (number of genes)
        self.I = I # Population size (number of individuals)
        self.G = G # Number of generations
        self.U = U # Number of parallel populations
        self.run_parallel = run_parallel

    def sga(self):
        '''One single-threaded genetic algorithm'''
        # Activation rate is fixed at 0.5 for the sake of this MWE
        pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population

        for g in range(self.G):
            # fitness is computed for all individuals
            fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
            # fitness is scaled back to 100%
            fitpop /= np.sum(fitpop)
            # 2I parents are selected at random according to each individual's relative fitness score
            parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
            # Parents are crossed 2 by 2, with each pair producing exactly one offspring
            crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
            embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
            # Mutation rate is fixed at 1/D for the sake of this MWE
            mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
            # "Mutated embryos" become fully fledged individuals and replace the parent generation
            pop = (1 - mutations) * embryos + mutations * (1 - embryos)

        # Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
        return pop.mean(axis=0)

    def pga(self):
        '''Multiple parallel genetic algorithms'''
        if self.run_parallel:
            p = mp.Pool(mp.cpu_count())
            universes = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
            p.close()
            p.join()
        else:
            universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
            
            for u in range(self.U):
                universes[u, :] = self.sga()

        # Multiple GAs are aggregated in a sort of "mean of means"
        return universes.mean(axis=0)

if __name__ == '__main__':
    def my_fitness_function(ind):
        '''Dummy fitness function, scores all individual equally...'''
        return 1.0

    # Dummy test to check if the code runs... it doesn't :(
    ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
    print(ga.pga())

Any kind of hint, piece of code or complete solution will be most welcomed. This used to be fairly easy in R, but with Python I'm apparently at my wits' end... thanks!

ETA: Fixed a few typos in the code and added the run_parallel argument to show that it runs perfectly fine without parallelization. Oh yeah, also, I run on Windows, otherwise I might have tried that Ray library which, I'm told, works wonders, especially when compared to multiprocessing.


Solution

  • See my comment to your post. Also, each process in the pool should seed the random number generator uniquely. Otherwise, they would be generating the same sequence of random numbers.

    Note that multiprocessing will not necessarily run faster due to the overhead in creating the process pool and in passing arguments and results from one process's address space to another address space. Your worker function, sga, has to be sufficiently CPU-intensive for multiprocessing to be advantageous, which I now understand that in actuality it is.

    from itertools import repeat
    import multiprocessing as mp
    import numpy as np
    
    # this will be executed by each process in the pool:
    def init_pool():
        from threading import current_thread
        ident = current_thread().ident
        np.random.seed(ident)
    
    def my_fitness_function(ind):
        '''Dummy fitness function, scores all individual equally...'''
        return 1.0
    
    class GeneticAlgorithm:
        def __init__(self, I, G, D, U, fitness_function, run_parallel):
            self.fitness_function = fitness_function # User-supplied fitness function
            self.D = D # Problem size (number of genes)
            self.I = I # Population size (number of individuals)
            self.G = G # Number of generations
            self.U = U # Number of parallel populations
            self.run_parallel = run_parallel
    
        def sga(self):
            '''One single-threaded genetic algorithm'''
            # Activation rate is fixed at 0.5 for the sake of this MWE
            pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population
    
            for g in range(self.G):
                # fitness is computed for all individuals
                fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
                # fitness is scaled back to 100%
                fitpop /= np.sum(fitpop)
                # 2I parents are selected at random according to each individual's relative fitness score
                parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
                # Parents are crossed 2 by 2, with each pair producing exactly one offspring
                crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
                embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
                # Mutation rate is fixed at 1/D for the sake of this MWE
                mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
                # "Mutated embryos" become fully fledged individuals and replace the parent generation
                pop = (1 - mutations) * embryos + mutations * (1 - embryos)
    
            # Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
            return pop.mean(axis=0)
    
        def pga(self):
            '''Multiple parallel genetic algorithms'''
            universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
            if self.run_parallel:
                pool_size = min(mp.cpu_count(), self.U)
                p = mp.Pool(pool_size, initializer=init_pool)
                results = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
                p.close()
                p.join()
                for u in range(self.U):
                    universes[u, :] = results[u]
            else:
                for u in range(self.U):
                    universes[u, :] = self.sga()
    
            # Multiple GAs are aggregated in a sort of "mean of means"
            return universes.mean(axis=0)
    
    if __name__ == '__main__':
        # Dummy test to check if the code runs... it doesn't :(
        ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
        print(ga.pga())
    

    Prints:

    [0.56 0.46 0.38 0.54 0.52]