Search code examples
c++protocol-buffersprotobuf-c

Parse sequences of protobuf messages from continguous chunks of fixed sized byte buffer


I've been struggling with this for two days straight with my poor knowledge with C++. What I need to do is parsing sequences of messages using protobuf C++ API from a big file, a file that may contain millions of such messages. Reading straight from the file is easy, as I can always just do "ReadVarInt32" to get the size and then do ParseFromCodedStream with the limit pushed on CodedInputStream, as described in this post. However, the I/O level API I am working with (libuv actually) requires a fixed sized of buffer being allocated for every read callback action. Apparently that block size has nothing to do with the message size I am reading out.

This makes my life hard. Basically everytime I read from the file and fill in the fixed-sized buffer (say 16K), that buffer would probably contain hundreds of complete protobuf messages, but the last chunk of that buffer would likely be an incomplete message. So I thought, okay what I should do is attempt reading as many messages as I can, and at the end, extract the last chunk and attach it to the beginning of the next 16K buffer I read out, keep going until I reach EOF of the file. I use ReadVarInt32() to get the size, and then compare that number with the rest of the buffer size, if the message size is smaller, keeps reading.

There is this API called GetDirectBufferPointer, so that I attempt to use this to record the pointer position before I even read out the next message's size. However I suspect due to endianness weirdness, if I just extract the rest of the byte array from where pointer starts and attaches to the next chunk, Parse won't succeed and in fact the first several bytes (8 I think) just completely messed up.

Alternatively, if I do codedStream.ReadRaw() and writes the residual stream into the buffer and then attaches to the head of the new chunk, the data won't get corrupted. But the problem is this time I will lose the "size" byte information as it has already been "read" in "ReadVarInt32"! And even if I just go ahead and remember the size information I read last time and directly call in next iteration message.ParseFromCodedStream(), it ended up reading one less byte, and some part even got corrupted and cannot restore the object successfully.

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    mCheckBuffer.clear();
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. Excuse my terrible C++ foo
    std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
    mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //Record the pointer location on CIS in bResidueBuffer
    cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
    &bResidueBufSize);

    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) {
         cis.ReadVarint32(&size);
    }
    //Have to read this again to get remaining buffer size
    cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);

    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer, or, it's the end of message 
    //and my buffer just allocated larger so size should be 0
    while (size <= mResidueBufSize && size != 0) {
        //If this cis I constructed didn't have the size info at the beginning, 
        //and I just read straight from it hoping to get the message out from 
        //the "size" I got from last iteration, it simply doesn't work
        //(read one less byte in fact, and some part of the message corrupted)
        //push the size constraint to the input stream;
        int limit = cis.PushLimit(size);
        //parse message from the input stream
        message.ParseFromCodedStream(&cis);  
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        printf("%s", str.c_str());
        //do something with the parsed object
        //Now I have to record the new pointer location again
        cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
        &bResidueBufSize);
        //Read another time the next message's size and go back to while loop check
        cis.ReadVarint32(&size);

    }
    //If I do the next line, bResidueBuffer will have the correct CIS information 
    //copied over, but not having the "already read" size info
    cis.ReadRaw(bResidueBuffer, bResidueBufSize);
    mResidueBuffer.clear();
    //I am constructing a new vector that receives the residual chunk of the 
    //current buffer that isn't enough to restore a message
    //If I don't do ReadRaw, this copy completely messes up at least the first 8 
    //bytes of the copied buffer's value, due to I suspect endianness
    mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0], 
    &bResidueBuffer[bResidueBufSize]);
}

I'm really out of idea now. Is it even possible to gracefully use protobuf with APIs that requires fixed-sized intermediate buffer at all? Any inputs very much appreciated, thanks!


Solution

  • I see two major problems with your code:

    std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
    mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));
    

    It looks like you are expecting std::merge to concatenate your buffers, but in fact this function performs a merge of two sorted arrays into a single sorted array in the sense of MergeSort. This doesn't make any sense in this context; mCheckBuffer will end up containing nonsense.

    cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
    &bResidueBufSize);
    

    Here you are casting &bResidueBuffer to an incompatible pointer type. bResidueBuffer is a char array, so &bResidueBuffer is a pointer to a char array, which is not a pointer to a pointer. This is admittedly confusing because arrays can be implicitly converted to pointers (where the pointer points to the first element of the array), but this is actually a conversion -- bResidueBuffer is itself not a pointer, it can just be converted to one.

    I think you're also misunderstanding what GetDirectBufferPointer() does. It looks like you want it to copy the rest of the buffer into bResidueBuffer, but the method never copies any data. The method gives you back a pointer that points into the original buffer.

    The correct way to call it is something like:

    const void* ptr;
    int size;
    cis.GetDirectBufferPointer(&ptr, &size);
    

    Now ptr will point into the original buffer. You could now compare this against a pointer to the beginning of the buffer to find out where you are in the stream, like:

    size_t pos = (const char*)ptr - &mCheckBuffer[0];
    

    But, you shouldn't do that, because CodedInputStream already has the method CurrentPosition() for exactly this purpose. That will return the current byte offset in the buffer. So, use that instead.