I am using the Python multiprocessing library. Whenever one of the processes throw a timeout error, my application ends itself. I want to keep the processes up.
I have a function that subscribes to a queue and listens to incoming messages:
def process_msg(i):
#get new message from the queue
#process it
import time
time.sleep(10)
return True
I have created a Pool that creates 6 processes and executes the process_msg() function above. When the function times out, I want the Pool to call the function again and wait for new messages instead of exiting:
if __name__ == "main":
import multiprocessing
from multiprocessing import Pool
pool = Pool(processes=6)
collection = range(6)
try:
val = pool.map_async(process_msg, collection)
try:
res = val.get(5)
except TimeoutError:
print('timeout here')
pool.close()
pool.terminate()
pool.join()
The code runs and when I get a timeout, the application terminates itself.
What I want it to do is to print that the timeout as occurred and call the same function again.
What's the right approach?
Here's a skeleton for a program that works. The main issue you had is the use of pool.terminate
, which "Stops the worker processes immediately without completing outstanding work" (see the documentation).
from multiprocessing import Pool, TimeoutError
def process_msg(i):
#get new message from the queue
#process it
import time
print(f"Starting to sleep, proxess # {i}")
time.sleep(10)
return True
def main():
print("in main")
pool = Pool(processes=6)
collection = range(6)
print("About to spawn sub processes")
val = pool.map_async(process_msg, collection)
while True:
try:
print("Waiting for results")
res = val.get(3)
print(f"Res is {res}")
break
except TimeoutError:
print("Timeout here")
print("Closing pool")
pool.close()
# pool.terminate() # do not terminate - it kill the child processes
print ("Joining pool")
pool.join()
print("exiting main")
if __name__ == "__main__":
main()
The output of this code is:
in main
About to spawn sub processes
Waiting for results
Starting to sleep, proxess # 0
Starting to sleep, proxess # 1
Starting to sleep, proxess # 2
Starting to sleep, proxess # 3
Starting to sleep, proxess # 4
Starting to sleep, proxess # 5
Timeout here
Waiting for results
Timeout here
Waiting for results
Timeout here
Waiting for results
Res is [True, True, True, True, True, True]
Closing pool
Joining pool
exiting main