Search code examples
pythonsocketsmultiprocessingpython-multiprocessingsocketserver

broken pipe error with python multiprocessing and socketserver


Essentially Im using the socketserver python library to try and handle communications from a central server to multiple raspberry pi4 and esp32 peripherals. Currently i have the socketserver running serve_forever, then the request handler calls a method from a processmanager class which starts a process that should handle the actual communication with the client.

It works fine if i use .join() on the process such that the processmanager method doesnt exit, but thats not how i would like it to run. Without .join() i get a broken pipe error as soon as the client communication process tries to send a message back to the client.

This is the process manager class, it gets defined in the main file and buildprocess is called through the request handler of the socketserver class:

import multiprocessing as mp
mp.allow_connection_pickling()

import queuemanager as qm
import hostmain as hmain


import camproc
import keyproc

import controlproc


# method that gets called into a process so that class and socket share memory
def callprocess(periclass, peritype, clientsocket, inqueue, genqueue):

    periclass.startup(clientsocket)


class ProcessManager(qm.QueueManager):

    def wipeproc(self, target):

# TODO make wipeproc integrate with the queue manager rather than directly to the class
        for macid in list(self.procdict.keys()):
            if target == macid:
                # calls proc kill for the class
                try:
                    self.procdict[macid]["class"].prockill()
                except Exception as e:
                    print("exception:", e, "in wipeproc")
                    
                # waits for process to exit naturally (class threads to close)
                self.procdict[macid]["process"].join()
                # remove dict entry for this macid
                self.procdict.pop(macid)
    



    # called externally to create the new process and append to procdict
    def buildprocess(self, peritype, macid, clientsocket):
        # TODO put some logic here to handle the differences of the controller process
        
        # generates queue object
        inqueue = mp.Queue()

        # creates periclass instance based on type
        if peritype == hmain.cam:
            periclass = camproc.CamMain(self, inqueue, self.genqueue)
        elif peritype == hmain.keypad:
            print("to be added to")
        elif peritype == hmain.motion:
            print("to be added to")
        elif peritype == hmain.controller:
            print("to be added to")
        
        # init and start call for the new process
        self.procdict[macid] = {"type": peritype, "inqueue": inqueue, "class": periclass, "process": None}
 
        self.procdict[macid]["process"] = mp.Process(target=callprocess, 
        args=(self.procdict[macid]["class"], self.procdict[macid]["type"], clientsocket, self.procdict[macid]["inqueue"], self.genqueue))

        self.procdict[macid]["process"].start()
        
        # updating the process dictionary before class obj gets appended
#        if macid in list(self.procdict.keys()):
#            self.wipeproc(macid)
        
            

        print(self.procdict)
        print("client added")

to my eye, all the pertinent objects should be stored in the procdict dictionary but as i mentioned it just gets a broken pipe error unless i join the process with self.procdict[macid]["process"].join() before the end of the buildprocess method

I would like it to exit the method but leave the communication process running as is, ive tried a few different things with restructuring what gets defined within the process and without, but to no avail. Thus far i havent been able to find any pertinent solutions online but of course i may have missed something too.

Thankyou for reading this far if you did! Ive been stuck on this for a couple days so any help would be appreciated, this is my first project with multiprocessing and sockets on any sort of scale.

#################

Edit to include pastebin with all the code:

https://pastebin.com/u/kadytoast/1/PPWfyCFT


Solution

  • Without .join() i get a broken pipe error as soon as the client communication process tries to send a message back to the client.

    That's because at the time when the request handler handle() returns, the socketserver does shutdown the connection. That socketserver simplifies the task of writing network servers means it does certain things automatically which are usually done in the course of network request handling. Your code is not quite making the intended use of socketserver. Especially, for handling requests asynchronously, Asynchronous Mixins are intended. With the ForkingMixIn the server will spawn a new process for each request, in contrast to your current code which does this by itself with mp.Process. So, I think you have basically two options:

    • code less of the request handling yourself and use the provided socketserver methods
    • stay with your own handling and don't use socketserver at all, so it won't get in the way.