Search code examples
pythonstreamxmodem

How can I moderate how much data I get from a file stream in python?


I've got an embedded system I'm writing a user app against. The user app needs to take a firmware image and split it into chunks suitable for sending to the embedded system for programming. I'm starting with S-record files, and using Xmodem for file transfer (meaning each major 'file' transfer would need to be ended with an EOF), so the easiest thing for me to do would be to split the image file into a set files of full s-records no greater than the size of the receive buffer of the (single threaded) embedded system. My user app is written in python, and I have a C program that will split the firmware image into properly sized files, but I thought there may be a more 'pythonic' way of going about this, perhaps by using a custom stream handler.

Any thoughts?

Edit : to add to the discussion, I can feed my input file into a buffer. How could I use range to set a hard limit going into the buffer of either the file size, or a full S-record line ('S' delimited ASCII text)?


Solution

  • I thought this was an interesting question and the S-record format isn't too complicated, so I wrote an S-record encoder that appears to work from my limited testing.

    import struct
    
    def s_record_encode(fileobj, recordtype, address, buflen):
        """S-Record encode bytes from file.
    
        fileobj      file-like object to read data (if any)
        recordtype   'S0' to 'S9'
        address      integer address
        buflen       maximum output buffer size
        """
        # S-type to (address_len, has_data)
        record_address_bytes = {
            'S0':(2, True), 'S1':(2, True), 'S2':(3, True), 'S3':(4, True),
            'S5':(2, False), 'S7':(4, False), 'S8':(3, False), 'S9':(2, False)
        }
    
        # params for this record type
        address_len, has_data = record_address_bytes[recordtype]
    
        # big-endian address as string, trimmed to length
        address = struct.pack('>L', address)[-address_len:]
    
        # read data up to 255 bytes minus address and checksum len
        if has_data:
            data = fileobj.read(0xff - len(address) - 1)
            if not data:
                return '', 0
        else:
            data = ''
    
        # byte count is address + data + checksum
        count = len(address) + len(data) + 1
        count = struct.pack('B', count)
    
        # checksum count + address + data
        checksummed_record = count + address + data
        checksum = struct.pack('B', sum(ord(d) for d in checksummed_record) & 0xff ^ 0xff)
    
        # glue record type to hex encoded buffer
        record = recordtype + (checksummed_record + checksum).encode('hex').upper()
    
        # return buffer and how much data we read from the file
        return record, len(data)
    
    
    
    def s_record_test():
        from cStringIO import StringIO
    
        # from an example, this should encode to given string
        fake_file = StringIO("\x0A\x0A\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
        encode_to = "S1137AF00A0A0D0000000000000000000000000061"
        fake_file.seek(0)
        record, buflen = s_record_encode(fake_file, 'S1', 0x7af0, 80)
        print 'record', record
        print 'encode_to', encode_to
        assert record == encode_to
    
        fake_file = StringIO()
        for i in xrange(1000):
            fake_file.write(struct.pack('>L', i))
        fake_file.seek(0)
    
        address = 0
    
        while True:
            buf, datalen = s_record_encode(fake_file, 'S2', address, 100)
            if not buf:
                break
            print address, datalen, buf
            address += datalen