Search code examples
pythonbluetoothbluetooth-lowenergypython-asyncio

python script using bluetooth running on windows 11 vs raspberry pi4


I've run into an issue with a python script which performs differently when run on Windows 11 (through VS) versus running on a Raspberry Pi4

The script has been modified from a CLi script found to interact with Victron Energy hardware.

Thanks to @ukbaz, the script ran and returned data from a bluetooth connected Victron device in my van. The script used a line:

loop.run_forever()

Great! works perfectly fine on both machines. I don't want to run forever, just for a 5 second duration, long enough to read the BT device and write to a txt file for future use.

After introducing a change:

 the_end = time.time() + 5
    while time.time() < the_end:

        loop.stop()
        loop.run_forever()

This will run fine on Windows, read BT device, write to file. On the Rpi, script runs with no errors but doesn't read BT or write to file.

Here is the full script

from __future__ import annotations

import inspect
import json
import logging
import time
from enum import Enum
from typing import Set
import asyncio
from typing import List, Tuple
from threading import Thread

from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

from victron_ble.devices import Device, DeviceData, detect_device_type
from victron_ble.exceptions import AdvertisementKeyMissingError, UnknownDeviceError

logger = logging.getLogger(__name__)


class BaseScanner:
    def __init__(self) -> None:
        """Initialize the scanner."""
        self._scanner: BleakScanner = BleakScanner(
            detection_callback=self._detection_callback
        )
        self._seen_data: Set[bytes] = set()

    def _detection_callback(self, device: BLEDevice, advertisement: AdvertisementData):
        # Filter for Victron devices and instant readout advertisements
        data = advertisement.manufacturer_data.get(0x02E1)
        if not data or not data.startswith(b"\x10") or data in self._seen_data:
            return

        # De-duplicate advertisements
        if len(self._seen_data) > 1000:
            self._seen_data = set()
        self._seen_data.add(data)

        self.callback(device, data)

    def callback(self, device: BLEDevice, data: bytes):
        raise NotImplementedError()

    async def start(self):
        await self._scanner.start()

    async def stop(self):
        await self._scanner.stop()


# An ugly hack to print a class as JSON
class DeviceDataEncoder(json.JSONEncoder):
    def default(self, obj):
        if issubclass(obj.__class__, DeviceData):
            data = {}
            for name, method in inspect.getmembers(obj, predicate=inspect.ismethod):
                if name.startswith("get_"):
                    value = method()
                    if isinstance(value, Enum):
                        value = value.name.lower()
                    if value is not None:
                        data[name[4:]] = value
            return data


class Scanner(BaseScanner):
    def __init__(self, device_keys: dict[str, str] = {}):
        super().__init__()
        self._device_keys = {k.lower(): v for k, v in device_keys.items()}
        self._known_devices: dict[str, Device] = {}

    async def start(self):
        logger.info(f"Reading data for {list(self._device_keys.keys())}")
        await super().start()

    def get_device(self, ble_device: BLEDevice, raw_data: bytes) -> Device:
        address = ble_device.address.lower()
        if address not in self._known_devices:
            advertisement_key = self.load_key(address)

            device_klass = detect_device_type(raw_data)
            if not device_klass:
                raise UnknownDeviceError(
                    f"Could not identify device type for {ble_device}"
                )

            self._known_devices[address] = device_klass(advertisement_key)
        return self._known_devices[address]

    def load_key(self, address: str) -> str:
        try:
            return self._device_keys[address]
        except KeyError:
            raise AdvertisementKeyMissingError(f"No key available for {address}")

    def callback(self, ble_device: BLEDevice, raw_data: bytes):
        logger.debug(
            f"Received data from {ble_device.address.lower()}: {raw_data.hex()}"
        )
        try:
            device = self.get_device(ble_device, raw_data)
        except AdvertisementKeyMissingError:
            return
        except UnknownDeviceError as e:
            logger.error(e)
            return
        parsed = device.parse(raw_data)

        blob = {
            "name": ble_device.name,
            "address": ble_device.address,
            "rssi": ble_device.rssi,
            "payload": parsed,
        }
        print(json.dumps(blob, cls=DeviceDataEncoder, indent=1))
        ve_string = json.dumps(blob, cls=DeviceDataEncoder, indent=1)
        print(ve_string)
        #MAC_filename = "this_device" + ".txt"
        #print(f"MAC filename: {MAC_filename}")

        this_file = open("this_device.txt", "w")
        this_file.write(ve_string)
        this_file.close()

        print("file written")
        time.sleep(3)


class DiscoveryScanner(BaseScanner):
    def __init__(self) -> None:
        super().__init__()
        self._seen_devices: Set[str] = set()

    def callback(self, device: BLEDevice, advertisement: bytes):
        if device.address not in self._seen_devices:
            logger.info(f"{device}")
            self._seen_devices.add(device.address)


class DebugScanner(BaseScanner):
    def __init__(self, address: str):
        super().__init__()
        self.address = address

    async def start(self):
        logger.info(f"Dumping advertisements from {self.address}")
        await super().start()

    def callback(self, device: BLEDevice, data: bytes):
        if device.address.lower() == self.address.lower():
            logger.info(f"{time.time():<24}: {data.hex()}")



def my_scan(device_keys: List[Tuple[str, str]]):
    
    loop = asyncio.get_event_loop()

    async def scan(keys):
        scanner = Scanner(keys)
        await scanner.start()

    asyncio.ensure_future(scan({k: v for k, v in device_keys}))

    the_end = time.time() + 5
    while time.time() < the_end:

        loop.stop()
        loop.run_forever()

if __name__ == '__main__':

    my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")])


I have tried moving from

run_forever 

to

run_until_complete

This method failed spectacularly on the Rpi :( so reverted back

Why would the script behave differently between 2 machines and how can I replicate on RPi? Many thanks for any help


Solution

  • I suspect that what you want to do is only exit after the file has been written.

    Maybe you could use the asyncio event capability? https://docs.python.org/3/library/asyncio-sync.html#event

    Create an event in the global scope by placing the following at the top of the file after the imports.

    file_written_event = asyncio.Event()
    
    

    Then add file_written_event.set() after the file is written. e.g.

            this_file = open("this_device.txt", "w")
            this_file.write(ve_string)
            this_file.close()
    
            print("file written")
            file_written_event.set()
    

    The bottom of your file would change the most. It does mostly the same, but has a while loop waiting for the event to happen before exiting.

    async def my_scan(device_keys: List[Tuple[str, str]]):
    
        async def scan(keys):
            scanner = Scanner(keys)
            await scanner.start()
    
        asyncio.ensure_future(scan({k: v for k, v in device_keys}))
        while not file_written_event.is_set():
            await asyncio.sleep(0.1)
    
    
    if __name__ == '__main__':
        logging.basicConfig()
        logger.setLevel(logging.DEBUG)
        asyncio.run(my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")]))