Search code examples
pythonstreamgrpc

How do i handle streaming messages with Python gRPC


I'm following this Route_Guide sample.

The sample in question fires off and reads messages without replying to a specific message. The latter is what i'm trying to achieve.

Here's what i have so far:

import grpc
...

channel = grpc.insecure_channel(conn_str)
try:
    grpc.channel_ready_future(channel).result(timeout=5)
except grpc.FutureTimeoutError:
    sys.exit('Error connecting to server')
else:
    stub = MyService_pb2_grpc.MyServiceStub(channel)
    print('Connected to gRPC server.')
    this_is_just_read_maybe(stub)


def this_is_just_read_maybe(stub):
    responses = stub.MyEventStream(stream())
    for response in responses:
        print(f'Received message: {response}')
        if response.something:
            # okay, now what? how do i send a message here?

def stream():
    yield my_start_stream_msg
    # this is fine, i receive this server-side
    # but i can't check for incoming messages here

I don't seem to have a read() or write() on the stub, everything seems to be implemented with iterators.

How do i send a message from this_is_just_read_maybe(stub)? Is that even the right approach?

My Proto is a bidirectional stream:

service MyService {
  rpc MyEventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
}

Solution

  • What you're trying to do is perfectly possible and will probably involve writing your own request iterator object that can be given responses as they arrive rather than using a simple generator as your request iterator. Perhaps something like

    class MySmarterRequestIterator(object):
    
        def __init__(self):
            self._lock = threading.Lock()
            self._responses_so_far = []
    
        def __iter__(self):
            return self
    
        def _next(self):
            # some logic that depends upon what responses have been seen
            # before returning the next request message
            return <your message value>
    
        def __next__(self):  # Python 3
            return self._next()
    
        def next(self):  # Python 2
            return self._next()
    
        def add_response(self, response):
            with self._lock:
                self._responses.append(response)
    

    that you then use like

    my_smarter_request_iterator = MySmarterRequestIterator()
    responses = stub.MyEventStream(my_smarter_request_iterator)
    for response in responses:
        my_smarter_request_iterator.add_response(response)
    

    . There will probably be locking and blocking in your _next implementation to handle the situation of gRPC Python asking your object for the next request that it wants to send and your responding (in effect) "wait, hold on, I don't know what request I want to send until after I've seen how the next response turned out".