Search code examples
pythonbit-manipulationdeserializationbitwise-operatorsdata-stream

Extract 12 bits that carry information from a data stream in Python


I am working on the implementation of the serial driver of the EEG (time-series acquisition device). The device encodes the data with 12bits x 26 total channels with a sampling rate of 200Hz

The serial data stream consists of signalling byte 0xA0 followed by 45 bytes that carry the data for 26 channels, each encoded with 12bits.

But here is the catch, these 12bits are not in fixed positions in the 45byte block. The first byes use only 4 LSB, whilst the rest 44 7 LSB.

To make this more illustrative I will try to represent it graphically below. Suppose that we have started the amplifier and it always gives us 4095 (max int value represented with 12bits) for all channels (so we have all "ones" for the data), then we have something like this:

a0 0f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f a0 next sample...

This has to be mapped to the int(1,...,26) with values 4095.

enter image description here

So, I made a python code, that first finds the beginning of the block, then it saves everything in one int/long, then I removed the bits on fixed positions, append 8 most significant 0 bits to make a 16bit representation and convert the byte array to a list of integers.

That works fine, but the problem is the speed. Seems that the code takes a considering amount of time for a single sample and it has to do it 200 times in one second. Let's include some other delays of the real serial read methods, everything has to stay much below 1sec for all 200 samples

#Python code
def readByte():
#mockup
    return 0xA0

def read45bytes():
    return      int(0x0f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f)


def remove_bit(num, i):
    mask = num >> (i + 1)
    mask = mask << i
    right = ((1 << i) - 1) & num
    return mask | right

def insert_mult_bits(num, bits, len, i):
    mask = num >> i
    mask = (mask << len) | bits
    mask = mask << i
    right = ((1 << i) - 1) & num
    return right | mask



def main():

    while(readByte()!=0xA0):
        print("Searching for the beginning of the packet of 45 bytes...")

    print("Beginning of the packet of 45 bytes found")



    #read whole sample
    sample=read45bytes()

    #remove unused bits
    corr=0;
    for i in range(7, sample.bit_length(), 8):
        sample=remove_bit(sample,i-corr);
        corr=corr+1;

    #add HSB to make 2byte representation
    corr=0;
    for i in range(12,sample.bit_length(),12):
        sample=insert_mult_bits(sample,0,4,i+corr)
        corr=corr+4;

    #convert to bytes 26channels x 2 bytes, bigendian
    bt=sample.to_bytes(26*2,'big');

    #assign the result to int list
    idx=0;
    out=[];
    for i in range(0,26*2-1,2):
        out.append(int(int((bt[i]<<8 | bt[i+1]))))
        idx=idx+1;

    #print first sample of the channel 1
    print(out.pop(0))

Solution

  • code00.py:

    #!/usr/bin/env python
    
    import sys
    import math
    import io
    import itertools as it
    
    
    START_MARKER = b"\xA0"
    START_MARKER_LEN = len(START_MARKER)
    
    BIT_VALUE_MASK = list(2 ** i for i in range(7, -1, -1))
    
    IGNORED_BITS_INDEXES = (7,)
    
    
    def chunk_size(channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
        libi = len(ignored_bits_indexes)
        #if libi > 7:
        #    raise ValueError
        bits = channel_count * bits_per_channel
        bpb = 8 - libi
        q, r = divmod(bits, bpb)
        r += ignored_heading_bits
        return q + math.ceil(r / 8)
    
    
    def byte_2_bits(byte):
        return [1 if (byte & i) else 0 for i in BIT_VALUE_MASK]
    
    
    def bits_2_val(bits):
        return sum(2 ** idx if bit == 1 else 0 for idx, bit in enumerate(bits[::-1]))
    
    
    def decode_chunk(chunk, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
        bit_lists = [reversed(byte_2_bits(b)) for b in chunk[::-1]]
        bits = list(it.chain(*bit_lists))
        channels = []
        cur_chan_bits = []
        for idx, bit in enumerate(bits[:-ignored_heading_bits]):
            if idx % 8 in ignored_bits_indexes:
                continue
            cur_chan_bits.append(bit)
            if len(cur_chan_bits) == bits_per_channel:
                channels.append(bits_2_val(cur_chan_bits[::-1]))
                cur_chan_bits = []
        if cur_chan_bits:
            raise ValueError("Something went wrong while decoding: ", cur_chan_bits)
        return channels[::-1]
    
    
    def read_data(stream, channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
        while 1:
            t = stream.read(START_MARKER_LEN)
            if not t:
                break
            if t != START_MARKER:
                continue
            print("Start marker...")
            size = chunk_size(channel_count=channel_count, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
            chunk = stream.read(size)
            if len(chunk) == size:
                decoded = decode_chunk(chunk, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
                print("Decoded: {:}\n".format(decoded))
        print("End of data.")
    
    
    def main(*argv):
                             # 1st chunk is the one in the question, I played a bit with next ones
        b =   START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F" \
            + START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7E" \
            + START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x60\x01" \
            + START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x5F\x7F" \
            + START_MARKER + b"\x00\x00\x3F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F"
    
        read_data(io.BytesIO(b))
    
    
    if __name__ == "__main__":
        print("Python {:s} {:03d}bit on {:s}\n".format(" ".join(elem.strip() for elem in sys.version.split("\n")),
                                                       64 if sys.maxsize > 0x100000000 else 32, sys.platform))
        rc = main(*sys.argv[1:])
        print("\nDone.")
        sys.exit(rc)
    

    Notes:

    • This approach (almost) doesn't use bit operations, instead it handles bits in a number (byte) as a list of numbers (possible values: 0, 1)

    • Decoding (on a complete chunk of data):

      1. Reverse:

        1. All the bytes in the chunk

        2. All the bits in each byte

        to get the chunk bits in reversed order

      2. Traverse the bit list (skipping 7th bit of each byte), and when 12 bits are encountered, convert them in reversed order (to "undo" the bit reverse from #1.2.) to a channel value which is added to the channel list

      3. Return the channel list in reversed order (to "undo" the byte reverse from #1.1.)

    • Some utility functions (pretty straightforward I guess) are used

    • Better error handling could be added

    Output:

    py_pc064_03_08_test0) [cfati@cfati-5510-0:/mnt/e/Work/Dev/StackOverflow/q069660629]> python code00.py 
    Python 3.8.10 (default, Sep 28 2021, 16:10:42) [GCC 9.3.0] 064bit on linux
    
    Start marker...
    Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095]
    
    Start marker...
    Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094]
    
    Start marker...
    Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 1]
    
    Start marker...
    Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094, 4095]
    
    Start marker...
    Decoded: [0, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095]
    
    End of data.
    
    Done.