Search code examples
pythonparallel-processingpygamemultiprocessingpathos

Using Pygame with parallelism in python


I am trying to train a neural network to play a SMB1 game made using Pygame. To get some speed up, I would like to use parallel processing in order to play multiple copies of the game at once, by different members of the population (and on different training data).

The root of my problem comes from the fact that Pygame isn't inherently instance-based; that is, it will only ever generate one window, with one display object. Because I can't create multiple Pygame windows and display objects for each process, the processes have to share a display object. This leads me to my first questions: Is there a way to have multiple instances of pygame, and if not, is there a (performance-light) method of concurrently drawing onto the display? a.k.a. each game draws to a different section of the whole window.

However, I don't really need every game to be rendered; I only care that at least one game instance is rendered, so that I can monitor its progress. My solution was then to assign each game a process id, and only the game with process id 0 would actually draw to the display. Concurrency problems solved! To accomplish this, I used multiprocessing.Process:

processes = [];
genome_batches = np.array_split(genomes,self.runConfig.parallel_processes);
for i in range(runConfig.parallel_processes):
    process = multiprocessing.Process(target=self.eval_genome_batch_feedforward,args=(genome_batches[i],config,i));
    processes.append(process);
for process in processes:
    process.start();
for process in processes:
    process.join();

However, this caused a problem of its own when multiprocessing pickles the objects: AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>' note: config and RunnerConfig are two different things; one's from the neat library I'm using, which is the config passed to the function, and the other is my own class, which is a property of the class from which the processes are starting.

After some research, it seems that because I am using a class method, multiprocessing pickles the class, which includes the above RunnerConfig which contains lambda functions, which are not pickleable. This is super hard to work around because those lambda functions are used specifically in self.eval_genome_batch. This leads to the second question: is it possible to use multiprocessing.Process in a way that doesn't need to pickle the outer class so that lambda functions aren't pickled?

Finally, after more research, it turns out that instead of using multiprocessing, which uses pickle, I could be using pathos.multiprocessing, which uses dill instead. Dill can pickle lambda functions. Hooray!

But no. There's one final f*ck you. Pathos.multiprocessing only has the .map and .map-equivalent functions from multiprocessing, which doesn't let me control the processes themselves. That means that when the function is called, there is no way (afaik) to tell the program which process id the game is being run from, and therefore whether to render to the screen. So the final question is: Is there a way to use pathos.multiprocessing.map (or, really, any library parallel function) in a way that a) doesn't break with lambda functions and b) can tell the function being called which process id is being used?

Final note: I know the simplest answer would just be to not render to Pygame. That would fix every problem. However, being able to see the program progressing and learning is very useful and important to me.

So, I have a list of different problems, any of which, if solved, would fix everything:

  • a way to use pygame as multiple different instances in different threads, spawned from the same process
  • a way to safely concurrently work with pygame's display (and update clock)
  • a way to use multiprocessing.Process such that it doesn't need to pickle the method's class but can still access the class variables
  • a multiprocessing library that:
    • Either doesn't need to pickle the lambda functions or is able to
    • Has a way to tell the subprocess which process worker is being used

EDIT: Here's most of the relevant code. Only the relevant methods were included, because the classes are quite long. If you'd like, the source code can be found at my github

game_runner_neat.py: the class in which the parallel processing is started

import neat
import baseGame
import runnerConfiguration
import os.path
import os
import visualize
import random
import numpy as np
#import concurrent.futures
import multiprocessing
from logReporting import LoggingReporter
from renderer import Renderer as RendererReporter
from videofig import videofig as vidfig
from neat.six_util import iteritems, itervalues

class GameRunner:

    #if using default version, create basic runner and specify game to run
    def __init__(self,game,runnerConfig):
        self.game = game;
        self.runConfig = runnerConfig;

    #skip some code


    #parallel version of eval_genomes_feedforward
    def eval_genome_batch_feedforward(self,genomes,config,processNum):
        for genome_id, genome in genomes:
            genome.fitness += self.eval_genome_feedforward(genome,config,processNum=processNum);

    
    def eval_training_data_batch_feedforward(self,genomes,config,data,processNum,lock):
        for datum in data:
            for genome_id,genome in genomes:
                genome.increment_fitness(lock,self.eval_genome_feedforward(genome,config,processNum=processNum,trainingDatum=datum)); #increment_fitness allows multiple threads to change the fitness of the same genome safely

    #evaluate a population with the game as a feedforward neural net
    def eval_genomes_feedforward(self, genomes, config):
        for genome_id,genome in genomes:
            genome.fitness = 0; #sanity check
        if (self.runConfig.training_data is None):
            if (self.runConfig.parallel):
                processes = [];
                genome_batches = np.array_split(genomes,self.runConfig.parallel_processes);
                for i in range(runConfig.parallel_processes):
                    process = multiprocessing.Process(target=self.eval_genome_batch_feedforward,args=(genome_batches[i],config,i));
                    processes.append(process);
                for process in processes:
                    process.start();
                for process in processes:
                    process.join();
                return;
            else:
                for genome_id, genome in genomes:
                    genome.fitness += self.eval_genome_feedforward(genome,config)
        else:
            if (self.runConfig.parallel):
                processes = [];
                data_batches = np.array_split(self.runConfig.training_data,self.runConfig.parallel_processes);
                lock = multiprocessing.Lock();
                for i in range(self.runConfig.parallel_processes):
                    process = multiprocessing.Process(target=self.eval_training_data_batch_feedforward,args=(genomes,config,data_batches[i],i,lock));
                    processes.append(process);
                    process.start();
                for process in processes:
                    process.join();
                return;
            else:
                for datum in self.runConfig.training_data:
                    for genome_id, genome in genomes:
                        genome.fitness += self.eval_genome_feedforward(genome,config,trainingDatum=datum)

runnerConfiguration.py (the class with the lambda functions, passed to GameRunner in init):

class RunnerConfig:

    def __init__(self,gameFitnessFunction,gameRunningFunction,logging=False,logPath='',recurrent=False,trial_fitness_aggregation='average',custom_fitness_aggregation=None,time_step=0.05,num_trials=10,parallel=False,returnData=[],gameName='game',num_generations=300,fitness_collection_type=None):

        self.logging = logging;
        self.logPath = logPath;
        self.generations = num_generations;
        self.recurrent = recurrent;
        self.gameName = gameName;
        self.parallel = parallel;
        self.time_step = time_step;
        self.numTrials = num_trials;
        self.fitnessFromGameData = gameFitnessFunction;
        self.gameStillRunning = gameRunningFunction;
        self.fitness_collection_type = fitness_collection_type;

        self.returnData = returnData;
##        for (datum in returnData):
##            if (isinstance(datum,IOData)):
##                [returnData.append(x) for x in datum.getSplitData()];
##            else:
##                returnData.append(datum);
##        
        if (trial_fitness_aggregation == 'custom'):
            self.fitnessFromArray = custom_fitness_aggregation;

        if (trial_fitness_aggregation == 'average'):
            self.fitnessFromArray = lambda fitnesses : sum(fitnesses)/len(fitnesses);

        if (trial_fitness_aggregation == 'max'):
            self.fitnessFromArray = lambda fitnesses : max(fitnesses);

        if (trial_fitness_aggregation == 'min'):
            self.fitnessFromArray = lambda fitnesses : min(fitnesses);

gameFitnessFunction and gameRunningFunction are functions passed to customize training behavior.

When the program tries to run eval_genomes_feedforward with runnerConfig.parallel = True, I get the following full error message:

Traceback (most recent call last):
  File "c:/Users/harrison_truscott/Documents/GitHub/AI_game_router/Neat/smb1Py_runner.py", line 94, in <module>
    winner = runner.run(config,'run_' + str(currentRun));
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 75, in run
    winner = pop.run(self.eval_genomes,self.runConfig.generations);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\neat\population.py", line 102, in run
    fitness_function(list(iteritems(self.population)), self.config)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 204, in eval_genomes
    self.eval_genomes_feedforward(genomes,config);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 276, in eval_genomes_feedforward
    process.start();
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 326, in _Popen
    return Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>'

When the first process breaks, I then get a second error message when the next process is interrupted by the first process's incomplete start:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 125, in _main
    prepare(preparation_data)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\runpy.py", line 263, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\runpy.py", line 96, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\smb1Py_runner.py", line 94, in <module>
    winner = runner.run(config,'run_' + str(currentRun));
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 75, in run
    winner = pop.run(self.eval_genomes,self.runConfig.generations);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\neat\population.py", line 102, in run
    fitness_function(list(iteritems(self.population)), self.config)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 204, in eval_genomes
    self.eval_genomes_feedforward(genomes,config);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py", line 276, in eval_genomes_feedforward
    process.start();
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py", line 326, in _Popen
    return Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

As an aside, multiprocessing.freeze_support() is the very first function I call in the main file being run.


Solution

  • I'll try to attack the main issues. My understanding of your actual problem is quite limited as I don't know what your code actually does.

    "a way to use pygame as multiple different instances in different threads, spawned from the same process"

    This doesn't work as pygame is built on SDL2 which states "You should not expect to be able to create a window, render, or receive events on any thread other than the main one."

    "a way to safely concurrently work with pygame's display (and update clock)"

    Same as above, the display only work in the main thread.

    "a way to use multiprocessing.Process such that it doesn't need to pickle the method's class but can still access the class variables"

    You could pickle the methods using something like dill, but it fells (to me) wrong to copy full on python object between processes. I'd go for another solution.

    "a multiprocessing library that:"

    1. Either doesn't need to pickle the lambda functions or is able to

    You need to use to serialize Python objects in order to send them between processes.

    2. Has a way to tell the subprocess which process worker is being used

    I don't understand what this mean.


    It seems to me that the problem could be solved with better separation of data and visualization. The training should have no knowledge about any visualization, as it's not dependent on how you want to display it. So there should not be any reason to share the pygame display.

    Once this is done, it shouldn't be too much problem (multi-threading always causes problems) to do what you're trying to do. Regarding the pickle issue; I'd try to avoid pickling Python objects and functions, and instead just pass basic primitives between threads and processes. It seems like you should be able to assign self.fitnessFromArray with a simple int instead and based on its value do the min/avg/max calculation in the thread/process instead.

    If you want to do threading, then the main thread will take care of the rendering. It'll also spawn threads for the training. When the threads are completed they'll return their result (or put it in a thread safe storage) and the main thread will poll the data and display the result. If the work done by the training takes longer than one frame, then divide up the work so each thread only partially train and can continue where it left off the next frame.

    The principal is the same if you instead want separate processes. The main process starts up several training processes and connect to them via sockets. From the sockets, you'd poll information about the state of the program and display it. It would basically be a client-server architecture (albeit on localhost) where the training scripts are servers and the main process is a client.