I've run into an issue with a python script which performs differently when run on Windows 11 (through VS) versus running on a Raspberry Pi4
The script has been modified from a CLi script found to interact with Victron Energy hardware.
Thanks to @ukbaz, the script ran and returned data from a bluetooth connected Victron device in my van. The script used a line:
loop.run_forever()
Great! works perfectly fine on both machines. I don't want to run forever, just for a 5 second duration, long enough to read the BT device and write to a txt file for future use.
After introducing a change:
the_end = time.time() + 5
while time.time() < the_end:
loop.stop()
loop.run_forever()
This will run fine on Windows, read BT device, write to file. On the Rpi, script runs with no errors but doesn't read BT or write to file.
Here is the full script
from __future__ import annotations
import inspect
import json
import logging
import time
from enum import Enum
from typing import Set
import asyncio
from typing import List, Tuple
from threading import Thread
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from victron_ble.devices import Device, DeviceData, detect_device_type
from victron_ble.exceptions import AdvertisementKeyMissingError, UnknownDeviceError
logger = logging.getLogger(__name__)
class BaseScanner:
def __init__(self) -> None:
"""Initialize the scanner."""
self._scanner: BleakScanner = BleakScanner(
detection_callback=self._detection_callback
)
self._seen_data: Set[bytes] = set()
def _detection_callback(self, device: BLEDevice, advertisement: AdvertisementData):
# Filter for Victron devices and instant readout advertisements
data = advertisement.manufacturer_data.get(0x02E1)
if not data or not data.startswith(b"\x10") or data in self._seen_data:
return
# De-duplicate advertisements
if len(self._seen_data) > 1000:
self._seen_data = set()
self._seen_data.add(data)
self.callback(device, data)
def callback(self, device: BLEDevice, data: bytes):
raise NotImplementedError()
async def start(self):
await self._scanner.start()
async def stop(self):
await self._scanner.stop()
# An ugly hack to print a class as JSON
class DeviceDataEncoder(json.JSONEncoder):
def default(self, obj):
if issubclass(obj.__class__, DeviceData):
data = {}
for name, method in inspect.getmembers(obj, predicate=inspect.ismethod):
if name.startswith("get_"):
value = method()
if isinstance(value, Enum):
value = value.name.lower()
if value is not None:
data[name[4:]] = value
return data
class Scanner(BaseScanner):
def __init__(self, device_keys: dict[str, str] = {}):
super().__init__()
self._device_keys = {k.lower(): v for k, v in device_keys.items()}
self._known_devices: dict[str, Device] = {}
async def start(self):
logger.info(f"Reading data for {list(self._device_keys.keys())}")
await super().start()
def get_device(self, ble_device: BLEDevice, raw_data: bytes) -> Device:
address = ble_device.address.lower()
if address not in self._known_devices:
advertisement_key = self.load_key(address)
device_klass = detect_device_type(raw_data)
if not device_klass:
raise UnknownDeviceError(
f"Could not identify device type for {ble_device}"
)
self._known_devices[address] = device_klass(advertisement_key)
return self._known_devices[address]
def load_key(self, address: str) -> str:
try:
return self._device_keys[address]
except KeyError:
raise AdvertisementKeyMissingError(f"No key available for {address}")
def callback(self, ble_device: BLEDevice, raw_data: bytes):
logger.debug(
f"Received data from {ble_device.address.lower()}: {raw_data.hex()}"
)
try:
device = self.get_device(ble_device, raw_data)
except AdvertisementKeyMissingError:
return
except UnknownDeviceError as e:
logger.error(e)
return
parsed = device.parse(raw_data)
blob = {
"name": ble_device.name,
"address": ble_device.address,
"rssi": ble_device.rssi,
"payload": parsed,
}
print(json.dumps(blob, cls=DeviceDataEncoder, indent=1))
ve_string = json.dumps(blob, cls=DeviceDataEncoder, indent=1)
print(ve_string)
#MAC_filename = "this_device" + ".txt"
#print(f"MAC filename: {MAC_filename}")
this_file = open("this_device.txt", "w")
this_file.write(ve_string)
this_file.close()
print("file written")
time.sleep(3)
class DiscoveryScanner(BaseScanner):
def __init__(self) -> None:
super().__init__()
self._seen_devices: Set[str] = set()
def callback(self, device: BLEDevice, advertisement: bytes):
if device.address not in self._seen_devices:
logger.info(f"{device}")
self._seen_devices.add(device.address)
class DebugScanner(BaseScanner):
def __init__(self, address: str):
super().__init__()
self.address = address
async def start(self):
logger.info(f"Dumping advertisements from {self.address}")
await super().start()
def callback(self, device: BLEDevice, data: bytes):
if device.address.lower() == self.address.lower():
logger.info(f"{time.time():<24}: {data.hex()}")
def my_scan(device_keys: List[Tuple[str, str]]):
loop = asyncio.get_event_loop()
async def scan(keys):
scanner = Scanner(keys)
await scanner.start()
asyncio.ensure_future(scan({k: v for k, v in device_keys}))
the_end = time.time() + 5
while time.time() < the_end:
loop.stop()
loop.run_forever()
if __name__ == '__main__':
my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")])
I have tried moving from
run_forever
to
run_until_complete
This method failed spectacularly on the Rpi :( so reverted back
Why would the script behave differently between 2 machines and how can I replicate on RPi? Many thanks for any help
I suspect that what you want to do is only exit after the file has been written.
Maybe you could use the asyncio event capability? https://docs.python.org/3/library/asyncio-sync.html#event
Create an event in the global scope by placing the following at the top of the file after the imports.
file_written_event = asyncio.Event()
Then add file_written_event.set()
after the file is written. e.g.
this_file = open("this_device.txt", "w")
this_file.write(ve_string)
this_file.close()
print("file written")
file_written_event.set()
The bottom of your file would change the most. It does mostly the same, but has a while loop waiting for the event to happen before exiting.
async def my_scan(device_keys: List[Tuple[str, str]]):
async def scan(keys):
scanner = Scanner(keys)
await scanner.start()
asyncio.ensure_future(scan({k: v for k, v in device_keys}))
while not file_written_event.is_set():
await asyncio.sleep(0.1)
if __name__ == '__main__':
logging.basicConfig()
logger.setLevel(logging.DEBUG)
asyncio.run(my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")]))