I am working on the implementation of the serial driver of the EEG (time-series acquisition device). The device encodes the data with 12bits x 26 total channels with a sampling rate of 200Hz
The serial data stream consists of signalling byte 0xA0 followed by 45 bytes that carry the data for 26 channels, each encoded with 12bits.
But here is the catch, these 12bits are not in fixed positions in the 45byte block. The first byes use only 4 LSB, whilst the rest 44 7 LSB.
To make this more illustrative I will try to represent it graphically below. Suppose that we have started the amplifier and it always gives us 4095 (max int value represented with 12bits) for all channels (so we have all "ones" for the data), then we have something like this:
a0 0f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f a0 next sample...
This has to be mapped to the int(1,...,26) with values 4095.
So, I made a python code, that first finds the beginning of the block, then it saves everything in one int/long, then I removed the bits on fixed positions, append 8 most significant 0 bits to make a 16bit representation and convert the byte array to a list of integers.
That works fine, but the problem is the speed. Seems that the code takes a considering amount of time for a single sample and it has to do it 200 times in one second. Let's include some other delays of the real serial read methods, everything has to stay much below 1sec for all 200 samples
#Python code
def readByte():
#mockup
return 0xA0
def read45bytes():
return int(0x0f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f)
def remove_bit(num, i):
mask = num >> (i + 1)
mask = mask << i
right = ((1 << i) - 1) & num
return mask | right
def insert_mult_bits(num, bits, len, i):
mask = num >> i
mask = (mask << len) | bits
mask = mask << i
right = ((1 << i) - 1) & num
return right | mask
def main():
while(readByte()!=0xA0):
print("Searching for the beginning of the packet of 45 bytes...")
print("Beginning of the packet of 45 bytes found")
#read whole sample
sample=read45bytes()
#remove unused bits
corr=0;
for i in range(7, sample.bit_length(), 8):
sample=remove_bit(sample,i-corr);
corr=corr+1;
#add HSB to make 2byte representation
corr=0;
for i in range(12,sample.bit_length(),12):
sample=insert_mult_bits(sample,0,4,i+corr)
corr=corr+4;
#convert to bytes 26channels x 2 bytes, bigendian
bt=sample.to_bytes(26*2,'big');
#assign the result to int list
idx=0;
out=[];
for i in range(0,26*2-1,2):
out.append(int(int((bt[i]<<8 | bt[i+1]))))
idx=idx+1;
#print first sample of the channel 1
print(out.pop(0))
code00.py:
#!/usr/bin/env python
import sys
import math
import io
import itertools as it
START_MARKER = b"\xA0"
START_MARKER_LEN = len(START_MARKER)
BIT_VALUE_MASK = list(2 ** i for i in range(7, -1, -1))
IGNORED_BITS_INDEXES = (7,)
def chunk_size(channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
libi = len(ignored_bits_indexes)
#if libi > 7:
# raise ValueError
bits = channel_count * bits_per_channel
bpb = 8 - libi
q, r = divmod(bits, bpb)
r += ignored_heading_bits
return q + math.ceil(r / 8)
def byte_2_bits(byte):
return [1 if (byte & i) else 0 for i in BIT_VALUE_MASK]
def bits_2_val(bits):
return sum(2 ** idx if bit == 1 else 0 for idx, bit in enumerate(bits[::-1]))
def decode_chunk(chunk, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
bit_lists = [reversed(byte_2_bits(b)) for b in chunk[::-1]]
bits = list(it.chain(*bit_lists))
channels = []
cur_chan_bits = []
for idx, bit in enumerate(bits[:-ignored_heading_bits]):
if idx % 8 in ignored_bits_indexes:
continue
cur_chan_bits.append(bit)
if len(cur_chan_bits) == bits_per_channel:
channels.append(bits_2_val(cur_chan_bits[::-1]))
cur_chan_bits = []
if cur_chan_bits:
raise ValueError("Something went wrong while decoding: ", cur_chan_bits)
return channels[::-1]
def read_data(stream, channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
while 1:
t = stream.read(START_MARKER_LEN)
if not t:
break
if t != START_MARKER:
continue
print("Start marker...")
size = chunk_size(channel_count=channel_count, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
chunk = stream.read(size)
if len(chunk) == size:
decoded = decode_chunk(chunk, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
print("Decoded: {:}\n".format(decoded))
print("End of data.")
def main(*argv):
# 1st chunk is the one in the question, I played a bit with next ones
b = START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7E" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x60\x01" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x5F\x7F" \
+ START_MARKER + b"\x00\x00\x3F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F"
read_data(io.BytesIO(b))
if __name__ == "__main__":
print("Python {:s} {:03d}bit on {:s}\n".format(" ".join(elem.strip() for elem in sys.version.split("\n")),
64 if sys.maxsize > 0x100000000 else 32, sys.platform))
rc = main(*sys.argv[1:])
print("\nDone.")
sys.exit(rc)
Notes:
This approach (almost) doesn't use bit operations, instead it handles bits in a number (byte) as a list of numbers (possible values: 0, 1)
Decoding (on a complete chunk of data):
Reverse:
All the bytes in the chunk
All the bits in each byte
to get the chunk bits in reversed order
Traverse the bit list (skipping 7th bit of each byte), and when 12 bits are encountered, convert them in reversed order (to "undo" the bit reverse from #1.2.) to a channel value which is added to the channel list
Return the channel list in reversed order (to "undo" the byte reverse from #1.1.)
Some utility functions (pretty straightforward I guess) are used
Better error handling could be added
Output:
py_pc064_03_08_test0) [cfati@cfati-5510-0:/mnt/e/Work/Dev/StackOverflow/q069660629]> python code00.py Python 3.8.10 (default, Sep 28 2021, 16:10:42) [GCC 9.3.0] 064bit on linux Start marker... Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095] Start marker... Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094] Start marker... Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 1] Start marker... Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094, 4095] Start marker... Decoded: [0, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095] End of data. Done.