Search code examples
pythonmultiprocessingpython-multiprocessing

Initialize each instance for each worker of multprocessing


As container for shared (static) data and the database connection I use a Worker class:

import os
import multiprocessing as mp
import time

class Worker:
    def __init__(self, data):
        self.data = data
    def initializer(self):
        # setting up a database connection
        print("{} with PID {} initialized".format(self, os.getpid()))
    def __call__(self, value):
        print("{} with PID {} called with value={}".format(self, os.getpid(), value))
        # doing something with self.data and write it to the database
        time.sleep(0.5)

def main():
    print("main has PID {}".format(os.getpid()))
    worker = Worker([1,2,3,4])
    pool = mp.Pool(processes=2, initializer=worker.initializer)
    pool.map(worker, range(4))

if __name__=="__main__":
    main()

However, the initialization is not working. I think it's because initializer() and __call__() is called for different instances. The output of the above code is as follows:

main has PID 5
<__main__.Worker object at 0xA> in main
<__mp_main__.Worker object at 0xB> with PID 6 initialized
<__mp_main__.Worker object at 0xC> with PID 6 called with value=0
<__mp_main__.Worker object at 0xD> with PID 7 initialized
<__mp_main__.Worker object at 0xE> with PID 7 called with value=1
<__mp_main__.Worker object at 0xD> with PID 6 called with value=2
<__mp_main__.Worker object at 0xE> with PID 7 called with value=3

Obviously, the location of the Worker instance have to change with the process id (PID) because the fork yields a new but duplicated instance. But it's also changing for initializer() and __call__() for the same PID? Thus, my database connection is not available.

Why is it so? How to do it properly without global variables etc. instead?


Solution

  • Per process, you are seeing one Worker which is the copy of your Worker class, and another copy which is the copy of your worker object.

    The solution is indeed to use a global. Globals are not shared between processes.

    import os
    import multiprocessing as mp
    import time
    
    class Worker:
        def __init__(self, data):
            self.data = data
    
        def initializer(self):
            # setting up a database connection
            global db
            print("{} with PID {} initialized".format(self, os.getpid()))
            db = "set by {} with PID {} initialized".format(self, os.getpid())
    
        def __call__(self, value):
            print("{} with PID {} called with value={}".format(self, os.getpid(), value))
            global db
            print("Using db {}".format(db))
            # doing something with self.data and write it to the database
            time.sleep(0.5)
    
    def main():
        print("main has PID {}".format(os.getpid()))
        worker = Worker([1,2,3,4])
        pool = mp.Pool(processes=2, initializer=worker.initializer)
        pool.map(worker, range(4))
    
    if __name__=="__main__":
        main()
    

    produces

    main has PID 29128
    <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
    <__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=0
    Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
    <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
    <__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=1
    Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
    <__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=2
    Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
    <__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=3
    Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized