Search code examples
pythonmultiprocessingpython-multiprocessingmultiprocessing-manager

Changing values of list in multiprocessing


I am new to python multiprocessing, a background about the below code. I am trying to create three processes, one to add an element to the list, one to modify element in the list, and one to print the list.

The three processes are ideally using the same list that is in shared memory, initiated using manager.

The problem I face is that testprocess2 is not able to set the value to 0, basically, it is not able to alter the list.

class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


class testprocess2(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            # lock.acquire()
            print("Altering")
            for idx in range(len(self.trades)):
                self.trades[idx].order_id = 0
            # lock.release()
            sleep(1)


class testprocess1(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            print("start")
            for idx in range(len(self.trades)):
                print(self.trades[idx].order_id)

            sleep(1)


class testprocess(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            # lock.acquire()
            n = random.randint(0, 9)
            print("adding random {}".format(n))
            self.trades.append(Trade(n))
            # lock.release()
            # print(trades)
            sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        p1 = testprocess(records, lock)
        p1.start()

        p2 = testprocess1(records, lock)
        p2.start()

        p3 = testprocess2(records, lock)
        p3.start()

        p1.join()
        p2.join()
        p3.join()

Solution

  • Strictly speaking your managed list is not in shared memory and it is very important to understand what is going on. The actual list holding your Trade instances resides in a process that is created when you execute the Manager() call. When you then execute records = manager.list([Trade(5)]), records is not a direct reference to that list because, as I said, we are not dealing with shared memory. It is instead a special proxy object that implements the same methods as a list but when you, for example, invoke append on this proxy object, it takes the argument you are trying to append and serializes it and transmits it to the manager's process via either a socket or pipe where it gets de-serialized and appended to the actual list. In short, operations on the proxy object are turned into remote method calls.

    Now for your problem. You are trying to reset the order_id attribute with the following statement:

    self.trades[idx].order_id = 0
    

    Since we are dealing with a remote list via a proxy object, the above statements unfortunately become the equivalent of:

    trade = self.trades[idx] # fetch object from the remote list
    trade.order_id = 0 # reset the order_id to 0 on the local copy
    

    What is missing is updating the list with the newly updated trade object:

    self.trades[idx] = trade
    

    So your single update statement really needs to be replaced with the above 3-statement sequence.

    I have also taken the liberty to modify your code in several ways.

    1. The PEP8 Style Guide for Python Code recommends that class names be capitalized.
    2. Since all of your process classes are identical in how they are constructed (i.e. have identical __init__ methods), I have created an abstract base class, TestProcess that these classes inherit from. All they have to do is provide a run method.
    3. I have made these process classes daemon classes. That means that they will terminate automatically when the main process terminates. I did this for demo purposes so that the program does not loop endlessly. The main process will terminate after 15 seconds.
    4. You do not need to pass the trades and lock arguments to the __init__ method of the Process class. If you were not deriving your classes from Process and you just wanted to, for example, have your newly created process be running a function foo that takes arguments trades and lock, then you would specify p1 = Process(target=foo, args=(trades, lock)). That is the real purpose of the args argument, i.e. to be used with the target argument. See documentation for threading.Thread class for details. I actually see very little value in actually deriving your classes from multiprocessing.Process (by not doing so there is better opportunity for reuse). But since you did, you are already in your __init__ method setting instance attributes self.trades and self.lock, which will be used when your run method is invoked implicitly by your calling the start method. There is nothing further you need to do. See the two additional code examples at the end.
    from multiprocessing import Process, Manager, Lock
    from time import sleep
    import random
    from abc import ABC, abstractmethod
    
    
    class Trade:
        def __init__(self, id):
            self.exchange = None
            self.order_id = id
    
    
    class TestProcess(Process, ABC):
        def __init__(self, trades, lock):
            Process.__init__(self, daemon=True)
            self.trades = trades
            self.lock = lock
    
        @abstractmethod
        def run():
            pass
    
    class TestProcess2(TestProcess):
        def run(self):
            while True:
                # lock.acquire()
                print("Altering")
                for idx in range(len(self.trades)):
                    trade = self.trades[idx]
                    trade.order_id = 0
                    # We must tell the managed list that it has been updated!!!:
                    self.trades[idx] = trade
                # lock.release()
                sleep(1)
    
    
    class TestProcess1(TestProcess):
        def run(self):
            while True:
                print("start")
                for idx in range(len(self.trades)):
                    print(f'index = {idx}, order id = {self.trades[idx].order_id}')
    
                sleep(1)
    
    
    class TestProcess(TestProcess):
        def run(self):
            while True:
                # lock.acquire()
                n = random.randint(0, 9)
                print("adding random {}".format(n))
                self.trades.append(Trade(n))
                # lock.release()
                # print(trades)
                sleep(5)
    
    
    if __name__ == "__main__":
    
        with Manager() as manager:
            records = manager.list([Trade(5)])
            lock = Lock()
    
            p1 = TestProcess(records, lock)
            p1.start()
    
            p2 = TestProcess1(records, lock)
            p2.start()
    
            p3 = TestProcess2(records, lock)
            p3.start()
    
            sleep(15) # run for 15 seconds
    

    Using classes not derived from multiprocessing.Process

    from multiprocessing import Process, Manager, Lock
    from time import sleep
    import random
    from abc import ABC, abstractmethod
    
    
    class Trade:
        def __init__(self, id):
            self.exchange = None
            self.order_id = id
    
    
    class TestProcess(ABC):
        def __init__(self, trades, lock):
            self.trades = trades
            self.lock = lock
    
        @abstractmethod
        def process():
            pass
    
    class TestProcess2(TestProcess):
        def process(self):
            while True:
                # lock.acquire()
                print("Altering")
                for idx in range(len(self.trades)):
                    trade = self.trades[idx]
                    trade.order_id = 0
                    # We must tell the managed list that it has been updated!!!:
                    self.trades[idx] = trade
                # lock.release()
                sleep(1)
    
    
    class TestProcess1(TestProcess):
        def process(self):
            while True:
                print("start")
                for idx in range(len(self.trades)):
                    print(f'index = {idx}, order id = {self.trades[idx].order_id}')
    
                sleep(1)
    
    
    class TestProcess(TestProcess):
        def process(self):
            while True:
                # lock.acquire()
                n = random.randint(0, 9)
                print("adding random {}".format(n))
                self.trades.append(Trade(n))
                # lock.release()
                # print(trades)
                sleep(5)
    
    
    if __name__ == "__main__":
    
        with Manager() as manager:
            records = manager.list([Trade(5)])
            lock = Lock()
    
            tp = TestProcess(records, lock)
            p1 = Process(target=tp.process, daemon=True)
            p1.start()
    
            tp1 = TestProcess1(records, lock)
            p2 = Process(target=tp1.process, daemon=True)
            p2.start()
    
            tp2 = TestProcess2(records, lock)
            p3 = Process(target=tp2.process, daemon=True)
            p3.start()
    
            sleep(15) # run for 15 seconds
    

    Using functions instead of classes derived from multiprocessing.Process

    from multiprocessing import Process, Manager, Lock
    from time import sleep
    import random
    
    
    class Trade:
        def __init__(self, id):
            self.exchange = None
            self.order_id = id
    
    
    def testprocess2(trades, lock):
        while True:
            # lock.acquire()
            print("Altering")
            for idx in range(len(trades)):
                trade = trades[idx]
                trade.order_id = 0
                # We must tell the managed list that it has been updated!!!:
                trades[idx] = trade
            # lock.release()
            sleep(1)
    
    
    def testprocess1(trades, lock):
        while True:
            print("start")
            for idx in range(len(trades)):
                print(f'index = {idx}, order id = {trades[idx].order_id}')
    
            sleep(1)
    
    
    def testprocess(trades, lock):
        while True:
            # lock.acquire()
            n = random.randint(0, 9)
            print("adding random {}".format(n))
            trades.append(Trade(n))
            # lock.release()
            # print(trades)
            sleep(5)
    
    
    if __name__ == "__main__":
    
        with Manager() as manager:
            records = manager.list([Trade(5)])
            lock = Lock()
    
            p1 = Process(target=testprocess, args=(records, lock), daemon=True)
            p1.start()
    
            p2 = Process(target=testprocess1, args=(records, lock), daemon=True)
            p2.start()
    
            p3 = Process(target=testprocess2, args=(records, lock), daemon=True)
            p3.start()
    
            sleep(15) # run for 15 seconds