Essentially Im using the socketserver python library to try and handle communications from a central server to multiple raspberry pi4 and esp32 peripherals. Currently i have the socketserver running serve_forever, then the request handler calls a method from a processmanager class which starts a process that should handle the actual communication with the client.
It works fine if i use .join()
on the process such that the processmanager method doesnt exit, but thats not how i would like it to run. Without .join()
i get a broken pipe error as soon as the client communication process tries to send a message back to the client.
This is the process manager class, it gets defined in the main file and buildprocess
is called through the request handler of the socketserver class:
import multiprocessing as mp
mp.allow_connection_pickling()
import queuemanager as qm
import hostmain as hmain
import camproc
import keyproc
import controlproc
# method that gets called into a process so that class and socket share memory
def callprocess(periclass, peritype, clientsocket, inqueue, genqueue):
periclass.startup(clientsocket)
class ProcessManager(qm.QueueManager):
def wipeproc(self, target):
# TODO make wipeproc integrate with the queue manager rather than directly to the class
for macid in list(self.procdict.keys()):
if target == macid:
# calls proc kill for the class
try:
self.procdict[macid]["class"].prockill()
except Exception as e:
print("exception:", e, "in wipeproc")
# waits for process to exit naturally (class threads to close)
self.procdict[macid]["process"].join()
# remove dict entry for this macid
self.procdict.pop(macid)
# called externally to create the new process and append to procdict
def buildprocess(self, peritype, macid, clientsocket):
# TODO put some logic here to handle the differences of the controller process
# generates queue object
inqueue = mp.Queue()
# creates periclass instance based on type
if peritype == hmain.cam:
periclass = camproc.CamMain(self, inqueue, self.genqueue)
elif peritype == hmain.keypad:
print("to be added to")
elif peritype == hmain.motion:
print("to be added to")
elif peritype == hmain.controller:
print("to be added to")
# init and start call for the new process
self.procdict[macid] = {"type": peritype, "inqueue": inqueue, "class": periclass, "process": None}
self.procdict[macid]["process"] = mp.Process(target=callprocess,
args=(self.procdict[macid]["class"], self.procdict[macid]["type"], clientsocket, self.procdict[macid]["inqueue"], self.genqueue))
self.procdict[macid]["process"].start()
# updating the process dictionary before class obj gets appended
# if macid in list(self.procdict.keys()):
# self.wipeproc(macid)
print(self.procdict)
print("client added")
to my eye, all the pertinent objects should be stored in the procdict dictionary but as i mentioned it just gets a broken pipe error unless i join the process with self.procdict[macid]["process"].join()
before the end of the buildprocess
method
I would like it to exit the method but leave the communication process running as is, ive tried a few different things with restructuring what gets defined within the process and without, but to no avail. Thus far i havent been able to find any pertinent solutions online but of course i may have missed something too.
Thankyou for reading this far if you did! Ive been stuck on this for a couple days so any help would be appreciated, this is my first project with multiprocessing and sockets on any sort of scale.
#################
Edit to include pastebin with all the code:
Without
.join()
i get a broken pipe error as soon as the client communication process tries to send a message back to the client.
That's because at the time when the request handler handle()
returns, the socketserver
does shutdown
the connection. That socketserver
simplifies the task of writing network servers means it does certain things automatically which are usually done in the course of network request handling. Your code is not quite making the intended use of socketserver
. Especially, for handling requests asynchronously, Asynchronous Mixins are intended. With the ForkingMixIn the server will spawn a new process for each request, in contrast to your current code which does this by itself with mp.Process
. So, I think you have basically two options:
socketserver
methodssocketserver
at all, so it won't get in the way.