I have a script, that runs various multiprocesses. These multiprocesses does not open child processes themself!
I now try to close the script with the Keyboardinterrupt. I see that all processes are terminated and joined. But the main process with not die. Why is that? I dont understand it...
Here is the code for the main process:
from position_pipeline import Pipeline as PositionPipeline
from relay_control import RelayControl
from server import run_webserver
from multiprocessing import Process, Value, Queue
import numpy as np
import cv2
import sys
def run_pipeline(pipeline, position_queue):
pipeline.load_calibration()
cap = cv2.VideoCapture("sample-video.mp4")
ret, frame = cap.read()
pipeline.load_image(frame)
pipeline.preprocess_image()
pipeline.detect_circles()
pipeline.calibrate()
pipeline.store_calibration()
while True:
ret, frame = cap.read()
if np.sum(frame) == None:
break
pipeline.load_image(frame)
pipeline.preprocess_image()
#pipeline.calibrate() maybe try to recalibrate very 100 iterations brcause of af distortion
pipeline.detect_circles()
pipeline.draw_circles()
large_circles = []
small_circles = []
for circle in pipeline.circles[1:]:
pipeline.draw_distance(circle)
# split circles in large and small
if circle[2] > 35 and circle[2] < 50:
large_circles.append(pipeline.calculate_h_v_distance((circle[0], circle[1])))
elif circle[2] > 15 and circle[2] < 25:
small_circles.append(pipeline.calculate_h_v_distance((circle[0], circle[1])))
position_queue.put({"large": large_circles, "small": small_circles, "image": pipeline.image})
pipeline.store_calibration()
cv2.destroyAllWindows()
cap.release()
def run_relay_controller(relay_controller, relay_control_queue, relay_status_queue):
while True:
if not relay_control_queue.empty():
commands = relay_control_queue.get()
for command in commands:
if command[0] == "on":
relay_controller.on(command[1])
elif command[0] == "off":
relay_controller.off(command[1])
relay_status_queue.put(relay_controller.status)
if __name__ == "__main__":
position_pipeline = PositionPipeline()
relay_controller = RelayControl([1, 2, 3, 4, 5, 6, 7, 8])
relay_control_queue = Queue()
relay_status_queue = Queue()
position_queue = Queue()
frame_queue = Queue()
position_process = Process(target=run_pipeline, args=(position_pipeline, position_queue))
relay_process = Process(target=run_relay_controller, args=(relay_controller, relay_control_queue, relay_status_queue))
server_process = Process(target=run_webserver, args=())
position_process.start()
relay_process.start()
server_process.start()
relay_status = None
circles = None
while True:
try:
if not position_process.is_alive() or not relay_process.is_alive():
print("Process died")
print(f"Position process alive: {position_process.is_alive()}")
print(f"Relay process alive: {relay_process.is_alive()}")
break
relay_control_queue.put([("on", 1), ("off", 2)])
if not position_queue.empty(): circles = position_queue.get(block=False)
if not relay_status_queue.empty(): relay_status = relay_status_queue.get(block=False)
if circles is not None:
# print(f"Circles large: {circles['large']}")
#print(f"Circles small: {circles['small']}")
frame_queue.put(circles["image"])
# check if there are still circles in the image, and if not --> initialize refill
if relay_status is not None:
None
#print(f"Relay status: {relay_status}")
except KeyboardInterrupt:
print("Keyboard interrupt")
position_process.terminate()
position_process.join()
print("Position process killed")
relay_process.terminate()
relay_process.join()
print("Relay process killed")
server_process.terminate()
server_process.join()
print("Server process killed")
break
print("Processes alive: ", position_process.is_alive(), relay_process.is_alive(), server_process.is_alive())
sys.exit()
And this is the output:
Keyboard interrupt
Position process killed
Relay process killed
Server process killed
Processes alive: False False False
But the main process never gets terminated.
Here is a Minimal Reproducable Example:
from multiprocessing import Process, Queue
import sys
import time
def run_pipeline():
while True:
print("pipeline running")
time.sleep(5)
def run_relay_controller():
while True:
print("relay running")
time.sleep(5)
def run_webserver():
while True:
print("webserver running")
time.sleep(5)
if __name__ == "__main__":
position_process = Process(target=run_pipeline, args=())
relay_process = Process(target=run_relay_controller, args=())
server_process = Process(target=run_webserver, args=())
position_process.start()
relay_process.start()
server_process.start()
frame_queue = Queue()
while True:
try:
frame_queue.put("placeholder") # with this line the main process does not terminate
except KeyboardInterrupt:
position_process.terminate()
relay_process.terminate()
server_process.terminate()
break
position_process.join()
relay_process.join()
server_process.join()
sys.exit()
The error stems from something with Queue maintaining a reference and not joining. https://github.com/python/cpython/issues/91185
A solution would be as follows :
if __name__ == "__main__":
position_process = Process(target=run_pipeline, args=())
relay_process = Process(target=run_relay_controller, args=())
server_process = Process(target=run_webserver, args=())
position_process.start()
relay_process.start()
server_process.start()
frame_queue = Queue()
while True:
try:
frame_queue.put("placeholder") # with this line the main process does not terminate
except KeyboardInterrupt:
position_process.terminate()
relay_process.terminate()
server_process.terminate()
break
position_process.join()
relay_process.join()
server_process.join()
frame_queue.get() #Forcefully flush the queue
frame_queue.close() #Close the queue
sys.exit()
Although not ideal, I would recommend you utilize something like a kill command through the queue to terminate gracefully through a kill command etc. as recommended by the docs.