Rename JBDUPS to JBDBMS, as this is more accurate
This commit is contained in:
@@ -6,14 +6,15 @@ import atexit
|
||||
import serial
|
||||
import serial.rs485
|
||||
import time
|
||||
from collections.abc import Iterator
|
||||
from dataclasses import dataclass, fields as dataclass_fields
|
||||
|
||||
from bmspy.utilities import debugger
|
||||
from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField
|
||||
from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField, BMSField, UPS
|
||||
|
||||
|
||||
@dataclass
|
||||
class JBDUPS:
|
||||
class JBDBMS(UPS):
|
||||
bms_voltage_total_volts: BMSScalarField | None = None
|
||||
bms_current_amps: BMSScalarField | None = None
|
||||
bms_capacity_remaining_ah: BMSScalarField | None = None
|
||||
@@ -42,7 +43,7 @@ class JBDUPS:
|
||||
bms_temperature_celcius: BMSMultiField | None = None
|
||||
bms_voltage_cells_volts: BMSMultiField | None = None
|
||||
|
||||
def items(self):
|
||||
def items(self) -> Iterator[tuple[str, BMSField]]:
|
||||
"""Yield (name, field) for all non-None BMS field attributes."""
|
||||
for f in dataclass_fields(self):
|
||||
v = getattr(self, f.name)
|
||||
@@ -50,7 +51,7 @@ class JBDUPS:
|
||||
yield f.name, v
|
||||
|
||||
|
||||
def serial_cleanup(ser, debug=0):
|
||||
def serial_cleanup(ser: serial.Serial, debug: int = 0) -> None:
|
||||
if debug > 2:
|
||||
debugger("serial: cleaning up...")
|
||||
if ser.is_open:
|
||||
@@ -59,7 +60,7 @@ def serial_cleanup(ser, debug=0):
|
||||
ser.close()
|
||||
|
||||
|
||||
def initialise_serial(device, debug=0):
|
||||
def initialise_serial(device: str, debug: int = 0) -> serial.Serial:
|
||||
ser = serial.Serial(device, baudrate=9600)
|
||||
ser.parity = serial.PARITY_NONE
|
||||
ser.bytesize = serial.EIGHTBITS
|
||||
@@ -72,12 +73,12 @@ def initialise_serial(device, debug=0):
|
||||
return ser
|
||||
|
||||
|
||||
def calculate_checksum(msg):
|
||||
def calculate_checksum(msg: bytes | bytearray) -> str:
|
||||
checksum = ""
|
||||
return checksum
|
||||
|
||||
|
||||
def verify_checksum(data, checksum):
|
||||
def verify_checksum(data: bytes | bytearray, checksum: bytes) -> bool:
|
||||
# (data + length + command code) checksum, then complement, then add 1, high bit first, low bit last
|
||||
# data should have start/rw stripped
|
||||
s = 0
|
||||
@@ -88,7 +89,7 @@ def verify_checksum(data, checksum):
|
||||
return s == chk
|
||||
|
||||
|
||||
def convert_to_signed(x):
|
||||
def convert_to_signed(x: int) -> int:
|
||||
# For values below 1024, these seem to be actual results
|
||||
# For values above 1024, these seem to be encoded to account for high and negative floats
|
||||
max_uint = 1024
|
||||
@@ -98,14 +99,14 @@ def convert_to_signed(x):
|
||||
return x
|
||||
|
||||
|
||||
def bytes_to_digits(high, low):
|
||||
def bytes_to_digits(high: int, low: int) -> int:
|
||||
result = high
|
||||
result <<= 8
|
||||
result = result | low
|
||||
return result
|
||||
|
||||
|
||||
def bytes_to_date(high, low):
|
||||
def bytes_to_date(high: int, low: int) -> str:
|
||||
result = bytes_to_digits(high, low)
|
||||
day = result & 0x1F
|
||||
mon = (result >> 5) & 0x0F
|
||||
@@ -113,7 +114,7 @@ def bytes_to_date(high, low):
|
||||
return "{:04d}-{:02d}-{:02d}".format(year, mon, day)
|
||||
|
||||
|
||||
def requestMessage(ser, reqmsg, debug=0):
|
||||
def requestMessage(ser: serial.Serial, reqmsg: bytearray, debug: int = 0) -> bytes | str | bool:
|
||||
if debug > 2:
|
||||
debugger("serial: starting up monitor")
|
||||
if ser.is_open:
|
||||
@@ -172,7 +173,7 @@ def requestMessage(ser, reqmsg, debug=0):
|
||||
debugger("serial: cannot open port")
|
||||
|
||||
|
||||
def parse_03_response(response, debug=0):
|
||||
def parse_03_response(response: bytearray, debug: int = 0) -> JBDBMS | bool:
|
||||
# Response is 34 bytes:
|
||||
# 00 begin: \xDD
|
||||
# 01 r/w: \xA5
|
||||
@@ -219,7 +220,7 @@ def parse_03_response(response, debug=0):
|
||||
if data_len == 0:
|
||||
return False
|
||||
|
||||
result = JBDUPS()
|
||||
result = JBDBMS()
|
||||
|
||||
vtot = bytes_to_digits(response[4], response[5]) * 0.01
|
||||
result.bms_voltage_total_volts = BMSScalarField(
|
||||
@@ -431,7 +432,7 @@ def parse_03_response(response, debug=0):
|
||||
return result
|
||||
|
||||
|
||||
def parse_04_response(response, debug=0):
|
||||
def parse_04_response(response: bytearray, debug: int = 0) -> BMSMultiField | bool:
|
||||
# Response is 7 + cells * 2 bytes:
|
||||
# 00 begin: \xDD
|
||||
# 01 r/w: \xA5
|
||||
@@ -496,7 +497,7 @@ def parse_04_response(response, debug=0):
|
||||
)
|
||||
|
||||
|
||||
def collect_data(ser, debug=0):
|
||||
def collect_data(ser: serial.Serial, debug: int = 0) -> JBDBMS | bool:
|
||||
# Request is 7 bytes:
|
||||
# \xDD for start
|
||||
# \xA5 for read, \x5A for write
|
||||
+1
-1
@@ -15,7 +15,7 @@ from dataclasses import asdict as dataclass_asdict
|
||||
from typing import Any, NoReturn
|
||||
|
||||
from bmspy.utilities import debugger
|
||||
from bmspy.jbd_ups import collect_data, initialise_serial
|
||||
from bmspy.jbd_bms import collect_data, initialise_serial
|
||||
|
||||
# Expected kernel log output when the USB-serial adapter is plugged in:
|
||||
# usb 1-1.4: new full-speed USB device number 4 using xhci_hcd
|
||||
|
||||
Reference in New Issue
Block a user