Search code examples
pythonmultithreadingpython-multithreadingxml-rpcsimplexmlrpcserver

Implement Per-Function Maximum Number of Concurrent Calls to Python XML-RPC Server


I have a Python-based SimpleXMLRPCServer similar to this:

from multiprocessing import Process
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer

class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
    pass


# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
    rpc_paths = ('/RPC2',)


def main():
    server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
    server.register_function(tester1)
    server.register_function(tester2)

    print("Server running...")
    server.serve_forever()


def tester1(id):
    p = Process(target=my_func1, args=(id,))
    p.start()

    return True

def tester2(id):
    p = Process(target=my_func2, args=(id,))
    p.start()

    return True

I want to implement a method to keep track of how many concurrent processes are currently being executed for tester1 and tester2, and if more than a maximum (user-defined) number of them are still executing, then queue each new request and execute when the number drops below the threshold.

Maybe a shared Pool for each function?


Solution

  • The following seems to do what I was asking for:

    from multiprocessing import Process, JoinableQueue
    from SimpleXMLRPCServer import SimpleXMLRPCServer
    from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
    import SocketServer
    import threading
    
    tester1_queue = JoinableQueue()
    tester2_queue = JoinableQueue()
    tester1_max_concurrent = 10
    tester2_max_concurrent = 10
    
    class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
        pass
    
    
    # Restrict to a particular path.
    class RequestHandler(SimpleXMLRPCRequestHandler):
        rpc_paths = ('/RPC2',)
    
    
    def main():
        # spin up tester1 queue watcher threads
        for i in range(tester1_max_concurrent):
            worker = threading.Thread(target=tester1_queue_watcher, args=(tester1_queue,))
            worker.daemon = True
            worker.start()
    
        # spin up tester2 queue watcher threads
        for i in range(tester2_max_concurrent):
            worker = threading.Thread(target=tester2_queue_watcher, args=(tester2_queue,))
            worker.daemon = True
            worker.start()
    
        server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
        server.register_function(tester1_handler, 'tester1')
        server.register_function(tester2_handler, 'tester2' )
    
        print("Server running...")
        server.serve_forever()
    
    def tester1_handler(id):
        tester1_queue.put((id,))
        return True
    
    def tester1_queue_watcher(q):
        while True:
            id = q.get()
            p = Process(target=tester1, args=(id,))
            p.start()
            p.join()
            q.task_done()
    
    def tester1(id):
        # do stuff
    
    def tester2_handler(id):
        tester2_queue.put((id,))
        return True
    
    def tester2_queue_watcher(q):
        while True:
            id = q.get()
            p = Process(target=tester2, args=(id,))
            p.start()
            p.join()
            q.task_done()
    
    def tester2(id):
        # do stuff