Search code examples
pythonpython-3.xmultiprocessingmessage-queuedeadlock

Deadlock in python multiprocessing queue


I'm using queues from the multiprocessing library for sharing data between processes.

I have 2 queues, both are limited to 10 objects, the first queue has one process that "puts" objects into it and many processes "get" from it.

The second queue has many processes that "put" objects into it, and only one process "gets" from it.

The system works perfectly for a while and then starts behaving strangly: only the process that "puts" objects into the first queue continues to work while the processes that read from the first queue apparently are not behaving/working anymore (even though the processes are alive). It seems that there's a deadlock here but I'm not sure, here is my code:

UPDATED

import multiprocessing
import logging
from multiprocessing import Process

logger = logging.get_logger(__name__)

# Processes 2, 3 ,4:

class Processes_234(Process):
    def __init__(self, message_queue_1, message_queue_2):
        Process.__init__(self)
        self.message_queue_1 = message_queue_1
        self.message_queue_2 = message_queue_2

    def run(self):
        while True:
            try:
                # get from queue
                el1, el2, el3 = self.message_queue_1.get()
                logger.debug('Processes234: get from queue')
            except Exception as exp:
                logger.debug("message_queue_1: queue empty, Exception message: " + str(exp))

            # do some stuff with el1, el2, el3...

            try:
                # put into second queue
                self.message_queue_2.put_nowait((el1, el2, el3))
                logger.debug('Processes234: put into queue')
            except Exception as excpt:
                logger.debug(excpt)
                logger.debug("message_queue_2: queue is full")
                # the queue is full so replace the old element with the new one
                try:
                    self.message_queue_2.get_nowait()
                    self.message_queue_2.put_nowait((el1, el2, el3))
                    # in case other process already fill the queue - ignore
                except:
                    pass


# process 5:
class Process5(Process):
    def __init__(self, message_queue_2):
        Process.__init__(self)
        self.message_queue_2 = message_queue_2

    def run(self):
        while True:
            try:
                # get from queue
                el1, el2, el = self.message_queue_2.get()
                print('Process5: get from queue')

            except Exception as exp:
                print("message_queue_2: queue empty, Exception message: " + str(exp))


def start_process_1():
    # init queues
    message_queue_1 = multiprocessing.Queue(maxsize=10)
    message_queue_2 = multiprocessing.Queue(maxsize=10)

    processes_234 = [Processes_234(message_queue_1, message_queue_2)
                     for _ in range(3)]

    for proc in processes_234:
        proc.start()

    process5 = Process5(message_queue_2)
    process5.start()
    counter = 1

    while True:
        el1 = counter + 1
        el2 = counter + counter
        el3 = "some string " * ((counter ** 2) % 60000)
        counter += 1
        # start passing data
        try:

            # put into queue
            message_queue_1.put_nowait((el1, el2, el3))
            logger.debug('Process1: put into queue')
        except Exception as excpt:
            logger.debug(excpt)
            logger.debug("message_queue_1: queue is full")
            # the queue is full so replace the old element with the new one
            try:
                message_queue_1.get_nowait()
                message_queue_1.put_nowait((el1, el2, el3))
                # in case other process already fill the queue - ignore
            except:
                pass


if __name__ == '__main__':
    start_process_1()

does anyone know what my problem is?

I'm using python 3.6.5


Solution

  • Finally I was able to solve the problem, it was the logger! According to logging library the logger is thread safe but not multi-process safe.

    I changed the code so that each process has its own logger and it solved the issue.