Search code examples
pythonmultiprocessinggeventdefunct

What is the cleanest way to stop a python multiprocessing worker attached to a queue in an infinite loop?


I'm implementing a producer-consumer pattern in python using multiprocessing.Pool and multiprocessing.Queue. Consumers are pre-forked processes that uses gevent to spawn multiple tasks.

Here is a trimmed down version of code:

import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time

# Task queue
queue = Queue()

def init_worker ():
    # Ignore signals in worker
    signal.signal( signal.SIGTERM, signal.SIG_IGN )
    signal.signal( signal.SIGINT, signal.SIG_IGN )
    signal.signal( signal.SIGQUIT, signal.SIG_IGN )

# One of the worker task
def worker_task1( ):
    while True:
        try:
            m = queue.get( timeout = 2 )

            # Break out if producer says quit
            if m == 'QUIT':
                print 'TIME TO QUIT'
                break

        except QueueEmpty:
            pass

# Worker
def work( ):
    gevent.joinall([
        gevent.spawn( worker_task1 ),
    ])

pool = Pool( 2, init_worker )
for i in xrange( 2 ):
    pool.apply_async( work )

try:
    while True:
        queue.put( 'Some Task' )
        time.sleep( 2 )

except KeyboardInterrupt as e:
    print 'STOPPING'

    # Signal all workers to quit
    for i in xrange( 2 ):
        queue.put( 'QUIT' )

    pool.join()

Now when I try to quit it, I get following state:

  1. Parent process is waiting for one of the children to join.
  2. One of the children is in defunct state. So finished but parent is waiting for other child to finish.
  3. Other child is showing: futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ....

So what is the correct way to end such a process cleanly?


Solution

  • I figured out the problem. According to documentation for multiprocessing.Pool.join(), pool needs to be close()ed before it can be join()ed. Adding pool.close() before pool.join() solved the problem.