Search code examples
pythonmultithreadingmultiprocessingpython-multithreadingpika

Python MultiProcessing


I'm using Python Python Multiprocessing for a RabbitMQ Consumers. On Application Start I create 4 WorkerProcesses.

def start_workers(num=4):
    for i in xrange(num):
        process = WorkerProcess()
        process.start()

Below you find my WorkerClass. The Logic works so far, I create 4 parallel Consumer Processes. But the Problem is after a Process got killed. I want to create a new Process. The Problem in the Logic below is that the new Process is created as child process from the old one and after a while the memory runs out of space. Is there any possibility with Python Multiprocessing to start a new process and kill the old one correctly?

class WorkerProcess(multiprocessing.Process):

def ___init__(self):
    app.logger.info('%s: Starting new Thread!', self.name)
    super(multiprocessing.Process, self).__init__()

def shutdown(self):
    process = WorkerProcess()
    process.start()
    return True

def kill(self):
    start_workers(1)
    self.terminate()

def run(self):
    try:
        # Connect to RabbitMQ
        credentials = pika.PlainCredentials(app.config.get('RABBIT_USER'), app.config.get('RABBIT_PASS'))
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=app.config.get('RABBITMQ_SERVER'), port=5672, credentials=credentials))
        channel = connection.channel()

        # Declare the Queue
        channel.queue_declare(queue='screenshotlayer',
                              auto_delete=False,
                              durable=True)

        app.logger.info('%s: Start to consume from RabbitMQ.', self.name)
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue='screenshotlayer')
        channel.start_consuming()
        app.logger.info('%s: Thread is going to sleep!', self.name)

        # do what channel.start_consuming() does but with stoppping signal
        #while self.stop_working.is_set():
        #    channel.transport.connection.process_data_events()

        channel.stop_consuming()
        connection.close()
    except Exception as e:
               self.shutdown()
    return 0

Thank You


Solution

  • In the main process, keep track of your subprocesses (in a list) and loop over them with .join(timeout=50) (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.join).

    Then check is he is alive (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.is_alive).

    If he is not, replace him with a fresh one.

    def start_workers(n):
        wks = []
        for _ in range(n):
            wks.append(WorkerProcess())
            wks[-1].start()
        while True:
            #Remove all terminated process
            wks = [p for p in wks if p.is_alive()]
    
            #Start new process
            for i in range(n-len(wks)):
                wks.append(WorkerProcess())
                wks[-1].start()