Search code examples
pythonprotocol-bufferspython-trio

How to use python-trio with google protocol buffer?


I am trying to read some data streams using protobuf in python, and i want to use trio to make the client for reading the streams. The protobuf has some method calls, and i find they do not work when i use trio streams.

Python client on a linux machine.

import DTCProtocol_pb2 as Dtc

async def parent(addr, encoding, heartbeat_interval):
    print(f"parent: connecting to 127.0.0.1:{addr[1]}")
    client_stream = await trio.open_tcp_stream(addr[0], addr[1])

    # encoding request
    print("parent: spawing encoding request ...")
    enc_req = create_enc_req(encoding) # construct encoding request
    await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request

    log.debug('get_reponse: started')
    response = await client_stream.receive_some(1024)
    m_size = struct.unpack_from('<H', response[:2]) # the size of message
    m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
    m_body = response[4:]
    m_resp = Dtc.EncodingResponse()

m_body would be some bytes data, which I dont know how to decode. Dtc.EncodingResponse() is the protobuf method which would give a Dtc object which contains the response in a readable format. (Dtc is the protobuf file). But I get nothing here. When I did this script without trio, Dtc.EncodingResponse() would give the full response in readable format.

I am guessing the problem is that the "client_stream" is a trio stream object that only reads bytes, and so I probably need to use a ReceiveChannel object instead. But if this is true, I dont know how to do this.

UPDATE: The answer below by Nathaniel J. Smith solves my problem.

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

I feel so silly, but I did not ParseFromString the data previously, and that was all it took. Extremely grateful to all who gave replies. Hope this helps someone out there.


Solution

  • Like @shmee said in the comment, I think your code got mangled some by the edits... you should double-check.

    When I did this script without trio, Dtc.EncodingResponse() would give the full response in readable format

    I think you might have dropped a line when switching to Trio? Dtc.EncodingResponse() just creates a new empty EncodingResponse object. If you want to parse the data from m_body into your new object, you have to do that explicitly, with something like:

    m_resp = Dtc.EncodingResponse()
    m_resp.ParseFromString(m_body)
    

    However, there's another problem... the reason it's called receive_some is that it receives some bytes, but might not receive all the bytes you asked for. Your code is assuming that a single call to receive_some will fetch all the bytes in the response, and that might be true when you're doing simple test, but in general it's not guaranteed. If you don't get enough data on the first call to receive_some, you might need to keep calling it repeatedly until you get all the data.

    This is actually very standard... sockets work the same way. That's why the first thing your server is sending an m_size field at the beginning – it's so you can tell whether you've gotten all the data or not!

    Unfortunately, as of June 2019, Trio doesn't provide a helper to do this loop for you – you can track progress on that in this issue. In the mean time, it's possible to write your own. I think something like this should work:

    async def receive_exactly(stream, count):
        buf = bytearray()
        while len(buf) < count:
            new_data = await stream.receive_some(count - len(buf))
            if not new_data:
                raise RuntimeError("other side closed the connection unexpectedly")
            buf += new data
        return buf
    
    async def receive_encoding_response(stream):
        header = await receive_exactly(stream, 4)
        (m_size, m_type) = struct.unpack('<HH', header)
        m_body = await receive_exactly(stream, m_size)
        m_resp = Dtc.EncodingResponse()
        m_resp.ParseFromString(m_size)
        return m_resp