Search code examples
pythonmultithreadinggrpcgrpc-python

How to make each client get their state if there is class instance in grpc-python server side?


I want to use grpc-python in the following scenario, but I don' t know how to realize it.

The scenario is that, in the python server, it uses class to calculate and update the instance' s state, then sends such state to corresponding client; in the client side, more than one clients need to communicate with the server to get its one result and not interfered by others.

Specifically, suppose there is a class with initial value self.i =0, then each time the client calls the class' s update function, it does self.i=self.i+1 and returns self.i. Actually there are two clients call such update function simultaneously, like when client1 calls update at third time, client2 calls update at first time.

I think this may can be solved by creating thread for each client to avoid conflict. If the new client calls, new thead will be created; if existing client calls, existing thread will be used. But I don' t know how to realize it?

Hope you can help me. Thanks in advance.


Solution

  • I think I solved this problem by myself. If you have any other better solutions, you can post here.

    I edited helloworld example in grpc-python introduction to explain my aim.

    For helloworld.proto

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_package = "io.grpc.examples.helloworld";
    option java_outer_classname = "HelloWorldProto";
    option objc_class_prefix = "HLW";
    
    package helloworld;
    
    // The greeting service definition.
    service Greeter {
      // Sends a greeting
      rpc SayHello (HelloRequest) returns (HelloReply) {}
      rpc Unsubscribe (HelloRequest) returns (HelloReply) {}
    }
    
    // The request message containing the user's name.
    message HelloRequest {
      string name = 1;
    }
    
    // The response message containing the greetings
    message HelloReply {
      string message = 1;
    }
    

    I add Unsubsribe function to allow one specific client to diconnect from server.

    In hello_server.py

    import grpc
    import helloworld_pb2
    import helloworld_pb2_grpc
    import threading
    from threading import RLock
    import time
    from concurrent import futures
    import logging
    
    
    class Calcuate:
        def __init__(self):
            self.i = 0
    
        def add(self):
            self.i+=1
            return self.i
    
    
    class PeerSet(object):
        def __init__(self):
            self._peers_lock = RLock()
            self._peers = {}
            self.instances = {}
    
        def connect(self, peer):
            with self._peers_lock:
                if peer not in self._peers:
                    print("Peer {} connecting".format(peer))
                    self._peers[peer] = 1
                    a = Calcuate()
                    self.instances[peer] = a
                    output = a.add()
                    return output
                else:
                    self._peers[peer] += 1
                    a = self.instances[peer]
                    output = a.add()
                    return output
    
    
        def disconnect(self, peer):
            print("Peer {} disconnecting".format(peer))
            with self._peers_lock:
                if peer not in self._peers:
                    raise RuntimeError("Tried to disconnect peer '{}' but it was never connected.".format(peer))
                del self._peers[peer]
                del self.instances[peer]
    
        def peers(self):
            with self._peers_lock:
                return self._peers.keys()
    
    
    class Greeter(helloworld_pb2_grpc.GreeterServicer):
    
        def __init__(self):
            self._peer_set = PeerSet()
    
        def _record_peer(self, context):
            return self._peer_set.connect(context.peer())
    
        def SayHello(self, request, context):
            output = self._record_peer(context)
            print("[thread {}] Peers: {}, output: {}".format(threading.currentThread().ident, self._peer_set.peers(), output))
            time.sleep(1)
            return helloworld_pb2.HelloReply(message='Hello, {}, {}!'.format(request.name, output))
    
        def Unsubscribe(self, request, context):
            self._peer_set.disconnect(context.peer())
            return helloworld_pb2.HelloReply(message='{} disconnected!'.format(context.peer()))
    
    
    
    def serve():
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
        server.add_insecure_port('[::]:50051')
        server.start()
        server.wait_for_termination()
    
    
    if __name__ == '__main__':
        logging.basicConfig()
        serve()
    
    

    The use of context.peer() is adapted from Richard Belleville' s answer in this post. You can change add() function to any other functions that can be used to update instance' s state.

    In hello_client.py

    from __future__ import print_function
    
    import logging
    
    import grpc
    import helloworld_pb2
    import helloworld_pb2_grpc
    
    
    def run():
        # NOTE(gRPC Python Team): .close() is possible on a channel and should be
        # used in circumstances in which the with statement does not fit the needs
        # of the code.
        with grpc.insecure_channel('localhost:50051') as channel:
            stub = helloworld_pb2_grpc.GreeterStub(channel)
            response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
            print("Greeter client received: " + response.message)
            response = stub.SayHello(helloworld_pb2.HelloRequest(name='Tom'))
            print("Greeter client received: " + response.message)
            response = stub.SayHello(helloworld_pb2.HelloRequest(name='Jerry'))
            print("Greeter client received: " + response.message)
            stub.Unsubscribe(helloworld_pb2.HelloRequest(name="end"))
    
    
    if __name__ == '__main__':
        logging.basicConfig()
        run()
    
    

    If we run serveral hello_client.py simultaneously, the server can distinguish the different clients and send correct corresponding info to them.