From deb6b2bdcc9c8232fc39491e223191a5ab02a25a Mon Sep 17 00:00:00 2001 From: Timothy Allen Date: Sat, 2 May 2026 18:13:33 +0200 Subject: [PATCH] Add a new DeviceState class to capture metadata from a reading --- bmspy/server.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/bmspy/server.py b/bmspy/server.py index ca6acb1..127e39c 100755 --- a/bmspy/server.py +++ b/bmspy/server.py @@ -11,12 +11,21 @@ import atexit import signal import json import struct -from dataclasses import asdict as dataclass_asdict +import serial +from dataclasses import asdict as dataclass_asdict, dataclass from typing import Any, NoReturn from bmspy.utilities import debugger +from bmspy.classes import UPS from bmspy.jbd_bms import collect_data, initialise_serial + +@dataclass +class DeviceState: + ser: serial.Serial + data: UPS | None = None + timestamp: float = 0.0 + # Expected kernel log output when the USB-serial adapter is plugged in: # usb 1-1.4: new full-speed USB device number 4 using xhci_hcd # usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00 @@ -161,17 +170,13 @@ def main(): debug = args.verbose device_list = args.devices or ["/dev/ttyUSB0"] - ups_devices = {} + ups_devices: dict[str, DeviceState] = {} for device_str in device_list: name, path = parse_device(device_str) if name in ups_devices: debugger("server: duplicate UPS name '{}', skipping {}".format(name, path)) continue - ups_devices[name] = { - "ser": initialise_serial(path, debug), - "data": None, - "timestamp": 0, - } + ups_devices[name] = DeviceState(ser=initialise_serial(path, debug)) if debug > 0: print("server: registered UPS '{}' on {}".format(name, path)) @@ -297,17 +302,17 @@ def main(): if debug > 0: debugger( "reading data for '{}', timestamp={}, time={}".format( - name, device["timestamp"], time.time() + name, device.timestamp, time.time() ) ) # only get new data five seconds after the last read - if device["timestamp"] <= time.time() - 5: - device["data"] = None - while not device["data"]: - device["data"] = collect_data(device["ser"], debug) + if device.timestamp <= time.time() - 5: + device.data = None + while not device.data: + device.data = collect_data(device.ser, debug) time.sleep(1) - device["timestamp"] = time.time() - result[name] = device["data"] + device.timestamp = time.time() + result[name] = device.data send_response(connection, result, client, debug)