I'm using queues from the multiprocessing library for sharing data between processes.
I have 2 queues, both are limited to 10 objects, the first queue has one process that "puts" objects into it and many processes "get" from it.
The second queue has many processes that "put" objects into it, and only one process "gets" from it.
The system works perfectly for a while and then starts behaving strangly: only the process that "puts" objects into the first queue continues to work while the processes that read from the first queue apparently are not behaving/working anymore (even though the processes are alive). It seems that there's a deadlock here but I'm not sure, here is my code:
import multiprocessing
import logging
from multiprocessing import Process
logger = logging.get_logger(__name__)
# Processes 2, 3 ,4:
class Processes_234(Process):
def __init__(self, message_queue_1, message_queue_2):
Process.__init__(self)
self.message_queue_1 = message_queue_1
self.message_queue_2 = message_queue_2
def run(self):
while True:
try:
# get from queue
el1, el2, el3 = self.message_queue_1.get()
logger.debug('Processes234: get from queue')
except Exception as exp:
logger.debug("message_queue_1: queue empty, Exception message: " + str(exp))
# do some stuff with el1, el2, el3...
try:
# put into second queue
self.message_queue_2.put_nowait((el1, el2, el3))
logger.debug('Processes234: put into queue')
except Exception as excpt:
logger.debug(excpt)
logger.debug("message_queue_2: queue is full")
# the queue is full so replace the old element with the new one
try:
self.message_queue_2.get_nowait()
self.message_queue_2.put_nowait((el1, el2, el3))
# in case other process already fill the queue - ignore
except:
pass
# process 5:
class Process5(Process):
def __init__(self, message_queue_2):
Process.__init__(self)
self.message_queue_2 = message_queue_2
def run(self):
while True:
try:
# get from queue
el1, el2, el = self.message_queue_2.get()
print('Process5: get from queue')
except Exception as exp:
print("message_queue_2: queue empty, Exception message: " + str(exp))
def start_process_1():
# init queues
message_queue_1 = multiprocessing.Queue(maxsize=10)
message_queue_2 = multiprocessing.Queue(maxsize=10)
processes_234 = [Processes_234(message_queue_1, message_queue_2)
for _ in range(3)]
for proc in processes_234:
proc.start()
process5 = Process5(message_queue_2)
process5.start()
counter = 1
while True:
el1 = counter + 1
el2 = counter + counter
el3 = "some string " * ((counter ** 2) % 60000)
counter += 1
# start passing data
try:
# put into queue
message_queue_1.put_nowait((el1, el2, el3))
logger.debug('Process1: put into queue')
except Exception as excpt:
logger.debug(excpt)
logger.debug("message_queue_1: queue is full")
# the queue is full so replace the old element with the new one
try:
message_queue_1.get_nowait()
message_queue_1.put_nowait((el1, el2, el3))
# in case other process already fill the queue - ignore
except:
pass
if __name__ == '__main__':
start_process_1()
does anyone know what my problem is?
I'm using python 3.6.5
Finally I was able to solve the problem, it was the logger! According to logging library the logger is thread safe but not multi-process safe.
I changed the code so that each process has its own logger and it solved the issue.