Search code examples
pythonraspberry-piusbchecksumraspberry-pi-pico

How can I do bidirectional communication over USB with checksum for a Raspberry Pi 5 and Raspberry Pi Pico (serial, UART, other?)


I’m working on a project that involves a Raspberry Pi 5 running a Python-based GUI and an RP2040 (running MicroPython) connected to a set of I2C sensors and 32 servos controlled by two PCA9685 PWM controllers.

My goal is to collect data from the sensors at a frequency of at least 1Hz, but preferably 10Hz+. The data consists of approximately 256 float variables from these sensors, and I need to verify the integrity of the data with a checksum.

Additionally, I need to update the 32 servos at a frequency between 1-10Hz, passing 4-digit integers to them. (0-4096)

As a proof of concept, I tried using serial communication between a Raspberry Pi 5 and Raspberry Pi Pico, with the Pico connected to one PCA9685 I2C breakout board. However, I’ve been encountering timing issues, missing/corrupted data, and problems getting the checksum to work properly.

I’m wondering if I’m being too ambitious with this solution, or if there are better alternatives out there. Any advice would be greatly appreciated. Thanks in advance to anyone who reads this.

Here’s the code I’ve been using:

Raspberry Pi 5 Code:

import serial
import json
import hashlib
from random import randint

def compute_checksum(data):
    return hashlib.md5(data.encode()).hexdigest()

def main():
    s = serial.Serial(port="/dev/ttyACM0", baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=2)  # reduced timeout
    s.flush()

    while True:
        try:
            data = [randint(0, 4096) for _ in range(16)]  # replace with your actual data
            json_data = json.dumps(data)
            checksum = compute_checksum(json_data)
            s.write((json_data + '|' + checksum + '\r').encode())
            s.flush()  # flush the buffer
            if s.in_waiting > 0:
                response = s.read_until().strip()
                print('Received:', response.decode())
        except Exception as e:
            print(f"An error occurred: {e}")
            break

if __name__ == "__main__":
    main()

Raspberry Pi Pico Code:

import board
import busio
import ujson
import select
import sys
from adafruit_pca9685 import PCA9685

# Create the I2C bus interface.
i2c = busio.I2C(board.GP17, board.GP16)    # Pi Pico RP2040

# Create a simple PCA9685 class instance.
pca = PCA9685(i2c)

# Set the PWM frequency to 60hz.
pca.frequency = 1000

def compute_checksum(data):
    h = uhashlib.md5()
    h.update(data)
    return ubinascii.hexlify(h.digest()).decode()

# Set up the poll object
poll_obj = select.poll()
poll_obj.register(sys.stdin, select.POLLIN)

# Loop indefinitely
while True:
    try:
        # Wait for input on stdin
        poll_results = poll_obj.poll(0)  # the '0' is how long it will wait for message before looping again (in microseconds)
        if poll_results:
            # Read the data from stdin (read data coming from PC)
            received_data = sys.stdin.readline().strip()
            data, received_checksum = received_data.split('|')
            try:
                values = ujson.loads(data)
                computed_checksum = compute_checksum(data)
                if computed_checksum == received_checksum:
                    for i in range(16):
                        pca.channels[i].duty_cycle = values[i]
                    sys.stdout.write("received data: " + data + "\r")
                else:
                    sys.stdout.write("checksum error\r")
            except ValueError:
                sys.stdout.write("invalid json: " + data + "\r")
    except Exception as e:
        print(f"An error occurred: {e}")
        break


Solution

  • I have two suggestions:

    • transmit your data faster, by increasing the baud rate, or
    • transmit less data, by packing more efficiently

    Let's look at each idea in a separate section. You can use either or both ideas - the first being the simpler.


    Your baud rate of 9,600 bits/s is pretty slow and most modern devices can manage much more than that. When you convert the bits to bytes, that is only 1200 bytes/s. Given that you will lose a couple of bits for parity and stop/start for every byte, you will only get around 1000 bytes/s.

    I would suggest you try:

    s = serial.Serial(port="/dev/ttyACM0", baudrate=115200 ...
    

    Failing that, step down through 96000, 57600 etc.


    Secondly, your packing is inefficient. When you convert your list to JSON and add a hex digest, it takes around 130 bytes. You can only transmit that 7 times a second at 1000 bytes/s. I would suggest a much more compact frame as follows:

    ff ff <16 off 2-byte shorts> CRC
    

    where ff ff is a header, followed by your 16 values as 2-bytes unsigned shorts and an 8-bit CRC. That makes a 35 bytes frame, or around 75% less than you currently have.

    Here is some code to pack a list of values into a frame and unpack a frame into a list of values with checking:

    #!/usr/bin/env python3
    
    import sys
    import crc8
    import struct
    import random
    import binascii
    
    
    def Pack(values):
       """Packs the list of values into a packet with checksum"""
    
       # The 32 byte payload of 16 values
       payload = struct.pack('!16H',*values)
    
       # Calculate CRC over payload
       CRC = crc8.crc8()
       CRC.update(payload)
    
       # Assemble header + payload + CRC into a packet
       packet = b'\xff\xff' + payload + CRC.digest()
       return packet
    
    def Unpack(packet):
       """Unpacks a packet and returns a list of values or empty list if CRC incorrect or other error"""
    
       # Extract components of packet
       hdr = packet[:2]
       payload = packet[2:34]
       packetCRC = packet[-1]
    
       # Check length of packet is correct
       if len(packet) != 35:
          print(f'ERROR: Packet length incorrect, received {len(packet)} bytes, expected 35', file=sys.stderr)
          return []
    
       # Check header is as expected
       if hdr != b'\xff\xff':
          print(f'ERROR: Packet header incorrect, received {hdr}, expected <ff> <ff>', file=sys.stderr)
          return []
    
       # Check CRC
       CRC = crc8.crc8()
       CRC.update(payload)
       if packetCRC != int.from_bytes(CRC.digest()):
          print(f'ERROR: Packet CRC incorrect', file=sys.stderr)
          return []
    
       # Everything ok, unpack and return list of 16 values
       values = struct.unpack('!16H',payload)
       return values
    
    if __name__ == "__main__":
       # Generate list of random values
       values = [ random.randint(0,65535) for i in range(16)]
       print('Initial list: ', *values)
    
       # Assemble values into a packet 
       packet = Pack(values)
    
       print("<<<UNPACK>>>\n\n")
       # Unpack values from packet
       result = Unpack(packet)
       print('Unpacked list: ', *values)
    

    Your code will need to change so that you read/write exactly 35 bytes rather than looking for newlines and JSON strings.

    I put the header on there so you can re-sync if there are any errors. Basically, if you call Unpack() and get an empty list back, you have lost sync. In that case you need to read until you get ff twice in a row and then read the following 33 bytes (payload plus CRC) and pass it (plus a header of ff ff) to Unpack() to get re-synced.


    Note that your data values can be represented in 12 bits rather than 16. So, rather than packing each sample into 16 bits for 2 bytes/value, you could pack 2 samples into 24 bits for 1.5 bytes/value. It doesn't currently seem worth the effort to do that bit-shifting though.