I'm following this Route_Guide sample.
The sample in question fires off and reads messages without replying to a specific message. The latter is what i'm trying to achieve.
Here's what i have so far:
import grpc
...
channel = grpc.insecure_channel(conn_str)
try:
grpc.channel_ready_future(channel).result(timeout=5)
except grpc.FutureTimeoutError:
sys.exit('Error connecting to server')
else:
stub = MyService_pb2_grpc.MyServiceStub(channel)
print('Connected to gRPC server.')
this_is_just_read_maybe(stub)
def this_is_just_read_maybe(stub):
responses = stub.MyEventStream(stream())
for response in responses:
print(f'Received message: {response}')
if response.something:
# okay, now what? how do i send a message here?
def stream():
yield my_start_stream_msg
# this is fine, i receive this server-side
# but i can't check for incoming messages here
I don't seem to have a read()
or write()
on the stub, everything seems to be implemented with iterators.
How do i send a message from this_is_just_read_maybe(stub)
?
Is that even the right approach?
My Proto is a bidirectional stream:
service MyService {
rpc MyEventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
}
What you're trying to do is perfectly possible and will probably involve writing your own request iterator object that can be given responses as they arrive rather than using a simple generator as your request iterator. Perhaps something like
class MySmarterRequestIterator(object):
def __init__(self):
self._lock = threading.Lock()
self._responses_so_far = []
def __iter__(self):
return self
def _next(self):
# some logic that depends upon what responses have been seen
# before returning the next request message
return <your message value>
def __next__(self): # Python 3
return self._next()
def next(self): # Python 2
return self._next()
def add_response(self, response):
with self._lock:
self._responses.append(response)
that you then use like
my_smarter_request_iterator = MySmarterRequestIterator()
responses = stub.MyEventStream(my_smarter_request_iterator)
for response in responses:
my_smarter_request_iterator.add_response(response)
. There will probably be locking and blocking in your _next
implementation to handle the situation of gRPC Python asking your object for the next request that it wants to send and your responding (in effect) "wait, hold on, I don't know what request I want to send until after I've seen how the next response turned out".