As container for shared (static) data and the database connection I use a Worker
class:
import os
import multiprocessing as mp
import time
class Worker:
def __init__(self, data):
self.data = data
def initializer(self):
# setting up a database connection
print("{} with PID {} initialized".format(self, os.getpid()))
def __call__(self, value):
print("{} with PID {} called with value={}".format(self, os.getpid(), value))
# doing something with self.data and write it to the database
time.sleep(0.5)
def main():
print("main has PID {}".format(os.getpid()))
worker = Worker([1,2,3,4])
pool = mp.Pool(processes=2, initializer=worker.initializer)
pool.map(worker, range(4))
if __name__=="__main__":
main()
However, the initialization is not working. I think it's because initializer()
and __call__()
is called for different instances. The output of the above code is as follows:
main has PID 5
<__main__.Worker object at 0xA> in main
<__mp_main__.Worker object at 0xB> with PID 6 initialized
<__mp_main__.Worker object at 0xC> with PID 6 called with value=0
<__mp_main__.Worker object at 0xD> with PID 7 initialized
<__mp_main__.Worker object at 0xE> with PID 7 called with value=1
<__mp_main__.Worker object at 0xD> with PID 6 called with value=2
<__mp_main__.Worker object at 0xE> with PID 7 called with value=3
Obviously, the location of the Worker
instance have to change with the process id (PID) because the fork yields a new but duplicated instance. But it's also changing for initializer()
and __call__()
for the same PID? Thus, my database connection is not available.
Why is it so? How to do it properly without global variables etc. instead?
Per process, you are seeing one Worker which is the copy of your Worker class, and another copy which is the copy of your worker object.
The solution is indeed to use a global. Globals are not shared between processes.
import os
import multiprocessing as mp
import time
class Worker:
def __init__(self, data):
self.data = data
def initializer(self):
# setting up a database connection
global db
print("{} with PID {} initialized".format(self, os.getpid()))
db = "set by {} with PID {} initialized".format(self, os.getpid())
def __call__(self, value):
print("{} with PID {} called with value={}".format(self, os.getpid(), value))
global db
print("Using db {}".format(db))
# doing something with self.data and write it to the database
time.sleep(0.5)
def main():
print("main has PID {}".format(os.getpid()))
worker = Worker([1,2,3,4])
pool = mp.Pool(processes=2, initializer=worker.initializer)
pool.map(worker, range(4))
if __name__=="__main__":
main()
produces
main has PID 29128
<__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=0
Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
<__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=1
Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
<__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=2
Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=3
Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized