I am trying to read some data streams using protobuf in python, and i want to use trio to make the client for reading the streams. The protobuf has some method calls, and i find they do not work when i use trio streams.
Python client on a linux machine.
import DTCProtocol_pb2 as Dtc
async def parent(addr, encoding, heartbeat_interval):
print(f"parent: connecting to 127.0.0.1:{addr[1]}")
client_stream = await trio.open_tcp_stream(addr[0], addr[1])
# encoding request
print("parent: spawing encoding request ...")
enc_req = create_enc_req(encoding) # construct encoding request
await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request
log.debug('get_reponse: started')
response = await client_stream.receive_some(1024)
m_size = struct.unpack_from('<H', response[:2]) # the size of message
m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
m_body = response[4:]
m_resp = Dtc.EncodingResponse()
m_body
would be some bytes data, which I dont know how to decode. Dtc.EncodingResponse()
is the protobuf method which would give a Dtc object which contains the response in a readable format. (Dtc is the protobuf file). But I get nothing here. When I did this script without trio, Dtc.EncodingResponse()
would give the full response in readable format.
I am guessing the problem is that the "client_stream" is a trio stream object that only reads bytes, and so I probably need to use a ReceiveChannel
object instead. But if this is true, I dont know how to do this.
UPDATE: The answer below by Nathaniel J. Smith solves my problem.
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)
I feel so silly, but I did not ParseFromString the data previously, and that was all it took. Extremely grateful to all who gave replies. Hope this helps someone out there.
Like @shmee said in the comment, I think your code got mangled some by the edits... you should double-check.
When I did this script without trio,
Dtc.EncodingResponse()
would give the full response in readable format
I think you might have dropped a line when switching to Trio? Dtc.EncodingResponse()
just creates a new empty EncodingResponse
object. If you want to parse the data from m_body
into your new object, you have to do that explicitly, with something like:
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)
However, there's another problem... the reason it's called receive_some
is that it receives some bytes, but might not receive all the bytes you asked for. Your code is assuming that a single call to receive_some
will fetch all the bytes in the response, and that might be true when you're doing simple test, but in general it's not guaranteed. If you don't get enough data on the first call to receive_some
, you might need to keep calling it repeatedly until you get all the data.
This is actually very standard... sockets work the same way. That's why the first thing your server is sending an m_size
field at the beginning – it's so you can tell whether you've gotten all the data or not!
Unfortunately, as of June 2019, Trio doesn't provide a helper to do this loop for you – you can track progress on that in this issue. In the mean time, it's possible to write your own. I think something like this should work:
async def receive_exactly(stream, count):
buf = bytearray()
while len(buf) < count:
new_data = await stream.receive_some(count - len(buf))
if not new_data:
raise RuntimeError("other side closed the connection unexpectedly")
buf += new data
return buf
async def receive_encoding_response(stream):
header = await receive_exactly(stream, 4)
(m_size, m_type) = struct.unpack('<HH', header)
m_body = await receive_exactly(stream, m_size)
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_size)
return m_resp