I have a class that has a dictionary as attribute. In this class, I run multiprocessing
to fill up a Queue
and then use additional process to perform some calculations on items that are in the Queue
. When the criteria is met, then I store the results in this dictionary, but I have seen that the dictionary keys are not present in the dictionary when the process wants to store a value there.
class RNG():
def __init__(self):
self.mydict = {}
self.done = False
self.q = Queue(maxsize = 100)
self.lock = Lock()
def _fill_queue(self):
while not self.done:
rng = randint(1,9e6)
if rng % 2 ==0:
_type = 'even'
if 'even' not in self.mydict.keys():
self.mydict['even'] = []
else:
_type = 'odd'
if 'odd' not in self.mydict.keys():
self.mydict['odd'] = []
while self.q.full():
sleep(10)
self.lock.acquire()
self.q.put((_type,rng))
self.lock.release()
def _process_queue(self):
while not self.done:
self.lock.acquire()
if self.q.empty():
self.lock.release()
continue
_type,num = self.q.get()
self.lock.release()
print(f'Appending {_type} number!')
self.mydict[_type].append(num)
self._check_for_exit()
def _check_for_exit(self):
if len(self.mydict['odd']) >= 1e6 and len(self.mydict['even'])>=1e6:
self.done = True
def run(self):
jobs = []
p = Process(target = self._fill_queue)
jobs.append(p)
p.start()
for _ in range(5):
p = Process(target = self._process_queue)
jobs.append(p)
p.start()
for job in jobs:
job.join()
if __name__ == '__main__':
rng = RNG()
rng.run()
When I run this, I get the following error when trying to append the number in the dictionary:
KeyError: 'even'
KeyError: 'odd'
Why the keys are not added in the dictionary? And also, if each process manages to write into a file and the file has the same name, does this means that I need to implement some kind of semaphore or Pipe
?
Remember these are running as two separate processes. They do not share memory. Each one has its own copy of the RNG instance, and will not see changes made by the other. If you need to communicate between them, you need to use a Queue or a Pipe.
Often, what multiprocessing apps to is establish a "command" object to pass between them. The command has some kind of verb, plus data to be acted on. So, when you want to add a key, you could send ('add','even') or something like that. Your queue handler can then do several different kinds of things.