Search code examples
pythonstructenumsconstruct

How to get construct.GreedyRange to give back a Byte?


Okay, suppose I have this working exactly as expected:

from enum import IntEnum
from contstruct import *

class Char(IntEnum):
    START = 0xAB
    STOP = 0xBC
    ESC = 0xCD

MAPPING = Mapping(Byte, {x: x+1 for x in Char})

SLIP = GreedyRange(
    Select(
        FocusedSeq(
            'x',
            Const(Char.ESC, Byte), 
            Renamed(MAPPING, 'x')
        ),
        Byte
    )
)

Example:

>>> buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
>>> SLIP.build(buffer)
b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff’

And:

>>> from operator import eq
>>> all(map(eq, SLIP.parse(SLIP.build(buffer)), buffer))
True

Now I need to wrap the encode/decode inside another struct:

PROTOCOL = FocusedSeq(
    'message',
    Const(Char.START, Byte),
    Renamed(SLIP, 'message'),
    Const(Char.STOP, Byte)
)

The build works exactly as expected:

>>> PROTOCOL.build(buffer)
b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc'

However, parsing, GreedyRange consumes 1 too many bytes:

>>> PROTOCOL.parse(b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc')
construct.core.StreamError: stream read less than specified amount, expected 1, found 0

How can I get GreedyRange to give back a byte?


Solution

  • The solution to this is NullTerminated(..., term=STOP), which internally buffers the the underlying stream and gives back when necessary.

    PROTOCOL = FocusedSeq(
        'message'
        Const(Char.START, Byte),
        message=NullTerminated(
            # NOTE build consumes entire stream and appends STOP
            # NOTE parse consumes steam until STOP and passes buffer to GreedyRange
            GreedyRange(
                Select(
                    FocusedSeq(
                        'x',
                        Const(Char.ESC, Byte),
                        x=MAPPING  # NOTE intentionally raises MappingError
                    ),
                    Byte  # NOTE fallback for MappingError
                )
            ),
            term=Byte.build(Char.STOP)
        )
    )