Search code examples
pythondjangoprocessmultiprocess

Python multiprocess Process never terminates


My routine below takes a list of urllib2.Requests and spawns a new process per request and fires them off. The purpose is for asynchronous speed, so it's all fire-and-forget (no response needed). The issue is that the processes spawned in the code below never terminate. So after a few of these the box wilL OOM. Context: Django web app. Any help?

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)



def request_manager(req_list):
    try:
            # put request list in the queue
            for req in req_list:
                    MPQ.put(req)

                    # call processes on queue
                    worker = multiprocessing.Process(target=process_request, args=(MPQ,))
                    worker.daemon = True
                    worker.start()

            # move on after queue is empty
            MPQ.join()

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(MPQ):
    try:
            while True:
                    req = MPQ.get()
                    dr = urllib2.urlopen(req)
                    MPQ.task_done()

    except Exception, e:
            logging.error(traceback.print_exc())

Solution

  • Ok after some fiddling (and a good night's sleep) I believe I've figured out the problem (and thank you Eri, you were the inspiration I needed). The main issue of the zombie processes was that I was not signaling back that the process was finished (and killing it) both of which I (naively) thought was happening automagically with multiprocess.

    The code that worked:

    # function that will be run through the pool
    def process_request(req):
        try:
                dr = urllib2.urlopen(req, timeout=30)
    
        except Exception, e:
                logging.error(traceback.print_exc())
    
    # process killer
    def sig_end(r):
        sys.exit()
    
    # globals
    MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
    if MP_CONCURRENT < 2: MP_CONCURRENT = 2
    CHUNK_SIZE = 20
    POOL = multiprocessing.Pool(MP_CONCURRENT)    
    
    # pool initiator
    def request_manager(req_list):
        try:
                resp = POOL.map_async(process_request, req_list, CHUNK_SIZE, callback=sig_end)
    
        except Exception, e:
                logging.error(traceback.print_exc())
    

    A couple of notes:

    1) The function that will be hit by "map_async" ("process_request" in this example) must be defined first (and before the global declarations).

    2) There is probably a more graceful way to exit the process (suggestions welcome).

    3) Using pool in this example really was best (thanks again Eri) due to the "callback" feature which allows me to throw a signal right away.