Search code examples
pythonpython-3.xpython-multiprocessing

Attribute sharing within a class using multiprocessing


I have a class that has a dictionary as attribute. In this class, I run multiprocessing to fill up a Queue and then use additional process to perform some calculations on items that are in the Queue. When the criteria is met, then I store the results in this dictionary, but I have seen that the dictionary keys are not present in the dictionary when the process wants to store a value there.

class RNG():
    def __init__(self):
        self.mydict = {}
        self.done = False
        self.q = Queue(maxsize = 100)
        self.lock = Lock()
        
    def _fill_queue(self):
        while not self.done:
            rng = randint(1,9e6)
            
            if rng % 2 ==0:
                _type = 'even'
                if 'even' not in self.mydict.keys():
                    self.mydict['even'] = []
            else:
                _type = 'odd'
                if 'odd' not in self.mydict.keys():
                    self.mydict['odd'] = []
                    
            while self.q.full():
                sleep(10)
            
            self.lock.acquire()
            self.q.put((_type,rng))
            self.lock.release()
            
    def _process_queue(self):
        while not self.done:
            self.lock.acquire()
            if self.q.empty():
                self.lock.release()
                continue
                
            _type,num = self.q.get()
            self.lock.release()
            
            print(f'Appending {_type} number!')
            self.mydict[_type].append(num)
            self._check_for_exit()
            
    def _check_for_exit(self):
        if len(self.mydict['odd']) >= 1e6 and len(self.mydict['even'])>=1e6:
            self.done = True
    
    def run(self):
        jobs = []
        p = Process(target = self._fill_queue)
        jobs.append(p)
        p.start()
        for _ in range(5):
            p = Process(target = self._process_queue)
            jobs.append(p)
            p.start()
            
        for job in jobs:
            job.join()
        
if __name__ == '__main__':
    rng = RNG()
    rng.run()
     

When I run this, I get the following error when trying to append the number in the dictionary:

KeyError: 'even'
KeyError: 'odd'

Why the keys are not added in the dictionary? And also, if each process manages to write into a file and the file has the same name, does this means that I need to implement some kind of semaphore or Pipe?


Solution

  • Remember these are running as two separate processes. They do not share memory. Each one has its own copy of the RNG instance, and will not see changes made by the other. If you need to communicate between them, you need to use a Queue or a Pipe.

    Often, what multiprocessing apps to is establish a "command" object to pass between them. The command has some kind of verb, plus data to be acted on. So, when you want to add a key, you could send ('add','even') or something like that. Your queue handler can then do several different kinds of things.