Search code examples
pythonpython-3.xmultiprocessingglobal-variablespython-multiprocessing

Python multiprocessing forked process dosn't take updated global variable into process


Problem

I am trying to run a function with multiprocess that reads from a global variable that was defined in another function. The forked process wont see defined or redefined value of this variable, even though the change is registered in the main process.

Code

The code provided will run and you can see that the assertion varifies that the global variable is changed. The task print out their ppids and pids to make sure they are truely forked, but the number that they long is not the one defined in the start function.

from random import random
from time import sleep
from multiprocessing import current_process
from multiprocessing import get_context
from os import getpid, getppid
from logging.handlers import QueueHandler, QueueListener
import logging

def task(steps):
    logger = logging.getLogger('app')

    global queue
    logger.addHandler(QueueHandler(queue))

    logger.setLevel(logging.DEBUG)
    process = current_process()
    # Make sure process is a true fork
    logger.info(f'Child {process.name} starting. ppid: {getppid()}, pid:{getpid()}')

    # THIS PART GRABS THE WRONG NUMBER
    global number
    logger.info(f'Child {process.name} number is:{number} :)')

    for i in range(5):
        logger.debug(f'Child {process.name} step {i}.')
        sleep(random())
    # report final message
    logger.info(f'Child {process.name} done.')
    return(f'Child {process.name} result.')

# closes the pool
def end():
    logger.info('Closing pool')
    pool.close()
    pool.join()

def work(taskCount = 5, steps=3):
    fakemap = []
    for i in range(taskCount):
        fakemap.append(pool.apply_async(task, args=(steps, )))

    for result in fakemap:
        # this isnt the most efficient way to do this
        print(result.get())

def start():
    global logger
    logger = logging.getLogger('app')

    ctx = get_context('fork')
    global queue  
    queue = ctx.Queue()

    consoleHandler = logging.StreamHandler()
    logger.addHandler(consoleHandler)
    #logger.addHandler(QueueListener(queue))
    logger.setLevel(logging.DEBUG)

    global pool
    pool = ctx.Pool()

    # Update the global var from within the function
    global number
    number = 42


if __name__ == '__main__':
    # global var
    number = 69
    start()
    logger.info('Main process started.')
    # global var is updated from function
    assert number == 42
    # configure child processes
    work(taskCount=3, steps=3)

    logger.info('Main process done.')
    end()

"Why are you doing it in this godforsaken way?"

My ultimate goal is to call this module from my flask app, run the function to initialize the queue that will be used for the logging within the processes, and then run a task when the flask endpoint is hit. The task could take a seccond, or probably alot longer, but the "wrapper" in this case the work function will always return a value for the webserver to give back the the user.

What have you tried?

I've varified that before the process forks the global variable is the correct value. I have verified that the process is actually forking. I've tried asking ChatGPT. It was very not helpfull in this case. Ive tried everything bro. I am genuinely at my witts end. If you are reading this, you are my last hope.


Solution

  • Thanks for the help. It seems as if the process is forked when the pool is created, so by defining the variable before the change is recorded.

    Changed start function:

    def start():
        global logger
        logger = logging.getLogger('app')
    
        ctx = get_context('fork')
        global queue  
        queue = ctx.Queue()
    
        consoleHandler = logging.StreamHandler()
        logger.addHandler(consoleHandler)
        #logger.addHandler(QueueListener(queue))
        logger.setLevel(logging.DEBUG)
    
        global number
        number = 42
        
        global pool
        pool = ctx.Pool()