I am trying to run a function with multiprocess that reads from a global variable that was defined in another function. The forked process wont see defined or redefined value of this variable, even though the change is registered in the main process.
The code provided will run and you can see that the assertion varifies that the global variable is changed. The task print out their ppids and pids to make sure they are truely forked, but the number that they long is not the one defined in the start function.
from random import random
from time import sleep
from multiprocessing import current_process
from multiprocessing import get_context
from os import getpid, getppid
from logging.handlers import QueueHandler, QueueListener
import logging
def task(steps):
logger = logging.getLogger('app')
global queue
logger.addHandler(QueueHandler(queue))
logger.setLevel(logging.DEBUG)
process = current_process()
# Make sure process is a true fork
logger.info(f'Child {process.name} starting. ppid: {getppid()}, pid:{getpid()}')
# THIS PART GRABS THE WRONG NUMBER
global number
logger.info(f'Child {process.name} number is:{number} :)')
for i in range(5):
logger.debug(f'Child {process.name} step {i}.')
sleep(random())
# report final message
logger.info(f'Child {process.name} done.')
return(f'Child {process.name} result.')
# closes the pool
def end():
logger.info('Closing pool')
pool.close()
pool.join()
def work(taskCount = 5, steps=3):
fakemap = []
for i in range(taskCount):
fakemap.append(pool.apply_async(task, args=(steps, )))
for result in fakemap:
# this isnt the most efficient way to do this
print(result.get())
def start():
global logger
logger = logging.getLogger('app')
ctx = get_context('fork')
global queue
queue = ctx.Queue()
consoleHandler = logging.StreamHandler()
logger.addHandler(consoleHandler)
#logger.addHandler(QueueListener(queue))
logger.setLevel(logging.DEBUG)
global pool
pool = ctx.Pool()
# Update the global var from within the function
global number
number = 42
if __name__ == '__main__':
# global var
number = 69
start()
logger.info('Main process started.')
# global var is updated from function
assert number == 42
# configure child processes
work(taskCount=3, steps=3)
logger.info('Main process done.')
end()
My ultimate goal is to call this module from my flask app, run the function to initialize the queue that will be used for the logging within the processes, and then run a task when the flask endpoint is hit. The task could take a seccond, or probably alot longer, but the "wrapper" in this case the work function will always return a value for the webserver to give back the the user.
I've varified that before the process forks the global variable is the correct value. I have verified that the process is actually forking. I've tried asking ChatGPT. It was very not helpfull in this case. Ive tried everything bro. I am genuinely at my witts end. If you are reading this, you are my last hope.
Thanks for the help. It seems as if the process is forked when the pool is created, so by defining the variable before the change is recorded.
Changed start function:
def start():
global logger
logger = logging.getLogger('app')
ctx = get_context('fork')
global queue
queue = ctx.Queue()
consoleHandler = logging.StreamHandler()
logger.addHandler(consoleHandler)
#logger.addHandler(QueueListener(queue))
logger.setLevel(logging.DEBUG)
global number
number = 42
global pool
pool = ctx.Pool()