Search code examples
pythonmsgpack

Getting byte offsets in msgpack 0.6


In my scenario I need to allow random access to single items serialized with msgpack. I.e. given a binary file and an item index, I want to jump to exactly that position in the file and deserialize this item.

To get the byte offset to each item I use the unpack function of the mgspack.Unpacker. In mgspack 0.5 unpack accepts an optional argument write_bytes which is a hook that is called on the raw data string before serializing. Counting the len of this string gives me the size of the item in bytes, allowing me to accumulate the byte offset.

Since msgpack 0.6 the write_bytes argument is not accepted anymore and I didn't find any replacement that gives me the raw input string or the number of consumed bytes after reading an item.

Here is the function that I use to create the index. The function returns the index as a list of byte offsets. Each entry index[i] contains the byte offset to item i. The critical part is the unpacker.unpack(write_bytes=hook) call, which does not accept any attributes anymore.

def index_from_recording(filename):
    # create empty index
    index = []

    # hook that keeps track of the byte offset of the `msgpack.Unpacker`
    hook = ByteOffsetHook()

    with open(filename, "rb") as f:
        # create the `msgpack.Unpacker`
        unpacker = msgpack.Unpacker(f)
        try:
            while True:
                # add current offset to index
                index.append(hook.offset)

                # unpack (and discard) next item.
                # The `hook` keeps track of the read bytes
                unpacker.unpack(write_bytes=hook)  # <== `write_bytes` not accepted since 0.6
        except msgpack.OutOfData:
            pass

    return index

The ByteOffsetHook is defined as follows. The hook simply counts the len of the raw input string and accumulates it.

class ByteOffsetHook(object):
    def __init__(self):
        self.offset = 0

    def __call__(self, data):
        self.offset += len(data)

For debugging you can use this function to generate a dummy recording.

def serialize_dummy_recording(filename):
    with open(filename, "wb") as f:
        for serialized_sample in [msgpack.packb({'x': i}) for i in range(10)]:
            f.write(serialized_sample)


def main():
    filename = "test.rec"
    if not os.path.exists(filename):
        serialize_dummy_recording(filename)

    index = index_from_recording(filename)
    print(index)


if __name__ == "__main__":
    main()

Solution

  • I found out that the tell method is returning the current byte offset of an Unpacker. This behavior is however not described in the latest documentation that I could find. Also the write_bytes parameter is not declared as deprecated as mentioned in the commit where the parameter was removed.

    The working function to create the index now looks as follows:

    def index_from_recording(filename):
        # create empty index
        index = []
    
        with open(filename, "rb") as f:
            # create the `msgpack.Unpacker`
            unpacker = msgpack.Unpacker(f)
            try:
                while True:
                    # add current offset to index
                    index.append(unpacker.tell())
    
                    # unpack (and discard) next item
                    unpacker.unpack()
            except msgpack.OutOfData:
                pass
    
        return index