Search code examples
pythonpython-3.xpygamemultiprocessingpython-multiprocessing

Using multiprocessing with pygame?


I'm trying to separate my input loop from my game logic in my simple snake game that I've made with pygame, but, I'm really struggling to figure out why nothing is happening when I run the program.

I've tried importing pygame in the subprocess, I checked for errors on the subprocess, and got nowhere. I looked on google, but I wasn't able to find any usable examples, or similar issues. Has anybody ever figured any of this stuff out?

Okay, here's the code:

import pygame
import time
import multiprocessing as mp
import random as rnd

pygame.init()


def event_to_dict(event: pygame.event) -> dict:
    return {
        'type': event.type,
        'key': event.key if event.type == pygame.KEYDOWN else None,
    }


class SnakeBoard:
    def __init__(self, rows: int, columns: int):
        self.rows = rows
        self.columns = columns
        self.vertices = []
        self.odd_column = False

        self.buff = []
        for _ in range(self.rows):
            self.buff.append([' ' for _ in range(self.columns)])

    def initialize(self):
        for r in range(self.rows):
            for c in range(self.columns):
                self.buff[r][c] = ' '
        self.odd_column = (self.columns >> 1) % 2 == 1
        self.buff[self.rows >> 1][self.columns >> 1] = '\u25cb'
        self.vertices = [(self.rows >> 1, self.columns >> 1)]

    def place_food(self):
        while True:
            r = rnd.randint(0, self.rows - 1)
            c = rnd.randint(0, self.columns - 1)
            codd = c % 2 == 1
            if (codd and self.odd_column or not codd and not self.odd_column) and self.buff[r][c] != '\u25cb':
                self.buff[r][c] = '\u25c9'
                break

    def tick(self, direction: int) -> bool:
        nr, nc = self.vertices[-1]

        if direction == 0:
            nr -= 1
        elif direction == 1:
            nc += 1
        elif direction == 2:
            nr += 1
        elif direction == 3:
            nc -= 1
        else:
            print("Invalid direction for snake")
            exit(1)

        if nr >= self.rows or nc >= self.columns or nr < 0 or nc < 0 or self.buff[nr][nc] == '\u25cb':
            return False

        self.vertices.append((nr, nc))
        self.vertices.pop(0)
        return True


class SnakeGame(SnakeBoard):
    def __init__(self, rows: int, columns: int):
        super().__init__(rows, columns)
        self.score = 0
        self.direction = 0
        self.initialize()
        self.place_food()

    def tick(self, direction: int = -1) -> bool:
        v = super().tick(self.direction if direction < 0 else direction)

        if self.buff[self.vertices[-1][0]][self.vertices[-1][1]] == '\u25c9':
            self.score += 1
            self.vertices.append(self.vertices[-1])
            self.place_food()

        for r in range(self.rows):
            for c in range(self.columns):
                if (r, c) in self.vertices:
                    self.buff[r][c] = '\u25cb'
                elif self.buff[r][c] != '\u25c9' and self.buff[r][c] != ' ':
                    self.buff[r][c] = ' '
        return v


class GameLoop(mp.Process):
    def __init__(self, q: object, size: list):
        super().__init__()
        self.q = q
        self.size = size

        self.g = SnakeGame(size[1] // 10, size[0] // 10)
        self.g.initialize()
        self.g.place_food()

        self.screen = None
        self.game_surf = None
        self.font = None

    def run(self) -> None:
        try:
            import pygame
            pygame.init()

            self.screen = pygame.display.set_mode(self.size)
            self.game_surf = pygame.Surface(self.size)
            self.font = pygame.font.SysFont('roboto', 16)

            is_running = True
            while is_running:
                if self.q.poll(0):
                    d = self.q.recv()
                    if d is not None:
                        if d['type'] == pygame.KEYDOWN:
                            if d['key'] == pygame.K_a:
                                self.g.direction = 3
                            elif d['key'] == pygame.K_s:
                                self.g.direction = 2
                            elif d['key'] == pygame.K_d:
                                self.g.direction = 1
                            elif d['key'] == pygame.K_w:
                                self.g.direction = 0
                            elif d['key'] == pygame.K_ESCAPE:
                                is_running = False
                    else:
                        is_running = False

                self.game_surf.fill((255, 255, 255))

                for ri, r in enumerate(self.g.buff):
                    for ci, c in enumerate(r):
                        if c == '\u25cb':
                            # print("Drawing a snake at {}, {}".format(ri * 10, ci * 10))
                            pygame.draw.circle(self.game_surf,
                                               (0, 0, 255),
                                               ((ci * 10) + 5, (ri * 10) + 5),
                                               5)
                        elif c == '\u25c9':
                            # wprint("Placing food at {}, {}".format(ci, ri))
                            pygame.draw.circle(self.game_surf,
                                               (0, 127, 255),
                                               ((ci * 10) + 5, (ri * 10) + 5),
                                               5)

                timg = self.font.render("Score: {}, Level: {}".format(self.g.score, self.g.score // 10 + 1),
                                        True,
                                        (0, 0, 0))

                self.screen.blit(self.game_surf, (0, 0))
                self.screen.blit(timg, (0, 0))
                pygame.display.flip()

                if self.g.tick():
                    time.sleep(1 / ((int(self.g.score / 10 + 1)) * 10))
                else:
                    timg = self.font.render("Game Over! Would you like to try again?", True, (0, 0, 0))
                    self.screen.blit(timg, ((self.size[0] >> 1) - 150, self.size[1] >> 1))
                    timg = self.font.render("Yes", True, (0, 0, 0))
                    btn_pos = ((self.size[0] >> 1) - 25, (self.size[1] >> 1) + 20)
                    self.screen.blit(timg, btn_pos)
                    pygame.display.flip()

                    while True:
                        event = pygame.event.wait()
                        if event.type == pygame.QUIT:
                            is_running = False
                            break
                        elif event.type == pygame.MOUSEBUTTONUP:
                            mx, my = pygame.mouse.get_pos()
                            if btn_pos[0] - 5 <= mx <= btn_pos[0] + 30 and btn_pos[1] - 5 <= my <= btn_pos[1] + 20:
                                self.g.initialize()
                                self.g.place_food()
                                self.g.score = 0
                                break
            self.q.close()
        except Exception as e:
            print(e)


if __name__ == '__main__':
    size = [800, 600]

    parent, child = mp.Pipe()
    p = GameLoop(child, size)
    p.start()

    running = True
    while running:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_ESCAPE:
                    running = False

            ed = event_to_dict(event)
            parent.send(ed)

    parent.close()
    p.join()
    pygame.quit()

Sorry, it's kinda strange, this was migrated from the console to pygame, so some of the logic is still using the unicode symbols.


Solution

  • Generally in GUI applications it's common to want to separate the GUI from the logic. There are benefits to doing this as it means your GUI remains responsive even if your logic is busy. However, in order to run things concurrently there are many drawbacks, including overheads. It's also important to know that python is not 'thread safe', so you can break things (see race conditions) if you're not careful.

    Simplified example with no concurrency

    Your example is quite complex so lets start with a simple example: A simple pygame setup with a moving dot

    import pygame
    import numpy as np
    
    # Initialise parameters
    #######################
    size = np.array([800, 600])
    position = size / 2
    direction = np.array([0, 1])  # [x, y] vector
    speed = 2
    running = True
    
    pygame.init()
    window = pygame.display.set_mode(size)
    pygame.display.update()
    
    # Game loop
    while running:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_w:
                    direction = np.array([0, -1])
                elif event.key == pygame.K_a:
                    direction = np.array([-1, 0])
                elif event.key == pygame.K_s:
                    direction = np.array([0, 1])
                elif event.key == pygame.K_d:
                    direction = np.array([1, 0])
    
        position += direction * speed
    
        if position[0] < 0 or position[0] > size[0] or position[1] < 0 or position[1] > size[1]:
            running = False
    
        pygame.time.wait(10)  # Limit the speed of the loop
    
        window.fill((0, 0, 0))
        pygame.draw.circle(window, (0, 0, 255), position, 10)
        pygame.display.update()
    
    pygame.quit()
    quit()
    
    

    We're going to split off the game logic from the gui

    Mutliprocessing and other options:

    So multiprocessing in python allows you to utilise multiple cores at the same time, through multiple interpreters. While this sounds good, as far as I/O goes: it comes with higher overheads and doesn't help at all (it will likely hurt your performance). Threading and asyncio both run on a single core i.e. they aren't 'parrallel' computing. But what they allow is to complete code while waiting for other code to finish. In other words you can input commands while your logic is running happily elsewhere.

    TLDR: as a general rule:

    • CPU Bound (100% of the core) program: use multiprocessing,
    • I/O bound program: use threading or asyncio

    Threaded version

    import pygame
    import numpy as np
    import threading
    import time
    
    class Logic:
        # This will run in another thread
        def __init__(self, size, speed=2):
            # Private fields -> Only to be edited locally
            self._size = size
            self._direction = np.array([0, 1])  # [x, y] vector, underscored because we want this to be private
            self._speed = speed
    
            # Threaded fields -> Those accessible from other threads
            self.position = np.array(size) / 2
            self.input_list = []  # A list of commands to queue up for execution
    
            # A lock ensures that nothing else can edit the variable while we're changing it
            self.lock = threading.Lock()
    
        def _loop(self):
            time.sleep(0.5)  # Wait a bit to let things load
            # We're just going to kill this thread with the main one so it's fine to just loop forever
            while True:
                # Check for commands
                time.sleep(0.01)  # Limit the logic loop running to every 10ms
    
                if len(self.input_list) > 0:
    
                    with self.lock:  # The lock is released when we're done
                        # If there is a command we pop it off the list
                        key = self.input_list.pop(0).key
    
                    if key == pygame.K_w:
                        self._direction = np.array([0, -1])
                    elif key == pygame.K_a:
                        self._direction = np.array([-1, 0])
                    elif key == pygame.K_s:
                        self._direction = np.array([0, 1])
                    elif key == pygame.K_d:
                        self._direction = np.array([1, 0])
    
                with self.lock:  # Again we call the lock because we're editing
                    self.position += self._direction * self._speed
    
                if self.position[0] < 0 \
                        or self.position[0] > self._size[0] \
                        or self.position[1] < 0 \
                        or self.position[1] > self._size[1]:
                    break  # Stop updating
    
        def start_loop(self):
            # We spawn a new thread using our _loop method, the loop has no additional arguments,
            # We call daemon=True so that the thread dies when main dies
            threading.Thread(target=self._loop,
                             args=(),
                             daemon=True).start()
    
    
    class Game:
        # This will run in the main thread and read data from the Logic
        def __init__(self, size, speed=2):
            self.size = size
            pygame.init()
            self.window = pygame.display.set_mode(size)
            self.logic = Logic(np.array(size), speed)
            self.running = True
    
        def start(self):
            pygame.display.update()
            self.logic.start_loop()
    
            # any calls made to the other thread should be read only
            while self.running:
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        self.running = False
                    elif event.type == pygame.KEYDOWN:
                        # Here we call the lock because we're updating the input list
                        with self.logic.lock:
                            self.logic.input_list.append(event)
    
                # Another lock call to access the position
                with self.logic.lock:
                    self.window.fill((0, 0, 0))
                    pygame.draw.circle(self.window, (0, 0, 255), self.logic.position, 10)
                    pygame.display.update()
    
            pygame.time.wait(10)
            pygame.quit()
            quit()
    
    
    if __name__ == '__main__':
        game = Game([800, 600])
        game.start()
    
    

    So what was achieved?

    Something light like this doesn't really need any performance upgrades. What this does allow though, is that the pygame GUI will remain reactive, even if the logic behind it hangs. To see this in action we can put the logic loop to sleep and see that we can still move the GUI around, click stuff, input commands etc.
    change:

    # Change this under _loop(self) [line 21]
    time.sleep(0.01)
    
    # to this
    time.sleep(2)
    
    # if we tried this in the original loop the program becomes glitchy