Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessing

Python Pool - How do I keep Python Process running when I get a timeout exception?


I am using the Python multiprocessing library. Whenever one of the processes throw a timeout error, my application ends itself. I want to keep the processes up.

I have a function that subscribes to a queue and listens to incoming messages:

def process_msg(i):
   #get new message from the queue
   #process it
   import time
   time.sleep(10)
   return True

I have created a Pool that creates 6 processes and executes the process_msg() function above. When the function times out, I want the Pool to call the function again and wait for new messages instead of exiting:


if __name__ == "main":

 import multiprocessing
 from multiprocessing import Pool

 pool = Pool(processes=6)
 collection = range(6)
 try:
  val = pool.map_async(process_msg, collection)
  try:
       res = val.get(5)
  except TimeoutError:
       print('timeout here')
 pool.close()
 pool.terminate()
 pool.join()

The code runs and when I get a timeout, the application terminates itself.

What I want it to do is to print that the timeout as occurred and call the same function again.

What's the right approach?


Solution

  • Here's a skeleton for a program that works. The main issue you had is the use of pool.terminate, which "Stops the worker processes immediately without completing outstanding work" (see the documentation).

    from multiprocessing import Pool, TimeoutError
    def process_msg(i):
       #get new message from the queue
       #process it
       import time
       print(f"Starting to sleep, proxess # {i}")
       time.sleep(10)
       return True
    
    def main():
        print("in main")
        pool = Pool(processes=6)
        collection = range(6)
        print("About to spawn sub processes")
        val = pool.map_async(process_msg, collection)
        while True: 
            try:
                print("Waiting for results")
                res = val.get(3)
                print(f"Res is {res}")
                break
            except TimeoutError:
                print("Timeout here")
    
        print("Closing pool")
        pool.close()
        # pool.terminate() # do not terminate - it kill the child processes 
        print ("Joining pool")
        pool.join()
        print("exiting main")
    
    if __name__ == "__main__":
        main()
        
    

    The output of this code is:

    in main
    About to spawn sub processes
    Waiting for results
    Starting to sleep, proxess # 0
    Starting to sleep, proxess # 1
    Starting to sleep, proxess # 2
    Starting to sleep, proxess # 3
    Starting to sleep, proxess # 4
    Starting to sleep, proxess # 5
    Timeout here
    Waiting for results
    Timeout here
    Waiting for results
    Timeout here
    Waiting for results
    Res is [True, True, True, True, True, True]
    Closing pool
    Joining pool
    exiting main