Refactor data dict into a proper JBDUPS class.

This commit is contained in:
2026-05-02 09:00:52 +02:00
parent b672c9f5ae
commit 4b13450b65
3 changed files with 691 additions and 565 deletions
-4
View File
@@ -64,10 +64,6 @@ def influxdb_create_snapshot(data, debug=0):
now = datetime.datetime.now(datetime.timezone.utc).isoformat() now = datetime.datetime.now(datetime.timezone.utc).isoformat()
for kind, contains in data.items(): for kind, contains in data.items():
# discard bmspy metadata
if kind == 'client' or kind == 'timestamp':
break
helpmsg = '' helpmsg = ''
if contains.get('help'): if contains.get('help'):
helpmsg = contains.get('help') helpmsg = contains.get('help')
+560
View File
@@ -0,0 +1,560 @@
#!/usr/bin/env python3
#
# Communicate with a JBD/SZLLT BMS and return basic information
#
import atexit
import serial
import serial.rs485
import time
from dataclasses import dataclass, fields as dataclass_fields
@dataclass
class BMSScalarField:
"""Single numeric or boolean measurement"""
help: str
raw_value: float | bool
value: str
units: str | None = None
def get(self, key, default=None):
return getattr(self, key, default)
@dataclass
class BMSMultiField:
"""Indexed collection (cells, temperature sensors)"""
help: str
label: str
raw_values: dict
values: dict
units: str | None = None
def get(self, key, default=None):
return getattr(self, key, default)
@dataclass
class BMSInfoField:
"""Text / date info (non-numeric)"""
help: str
info: str
def get(self, key, default=None):
return getattr(self, key, default)
@dataclass
class JBDUPS:
bms_voltage_total_volts: BMSScalarField | None = None
bms_current_amps: BMSScalarField | None = None
bms_capacity_remaining_ah: BMSScalarField | None = None
bms_capacity_nominal_ah: BMSScalarField | None = None
bms_charge_cycles: BMSScalarField | None = None
bms_manufacture_date: BMSInfoField | None = None
bms_protection_sop_bool: BMSScalarField | None = None
bms_protection_sup_bool: BMSScalarField | None = None
bms_protection_wgop_bool: BMSScalarField | None = None
bms_protection_wgup_bool: BMSScalarField | None = None
bms_protection_cotp_bool: BMSScalarField | None = None
bms_protection_cutp_bool: BMSScalarField | None = None
bms_protection_dotp_bool: BMSScalarField | None = None
bms_protection_dutp_bool: BMSScalarField | None = None
bms_protection_cocp_bool: BMSScalarField | None = None
bms_protection_docp_bool: BMSScalarField | None = None
bms_protection_scp_bool: BMSScalarField | None = None
bms_protection_fdic_bool: BMSScalarField | None = None
bms_protection_slmos_bool: BMSScalarField | None = None
bms_capacity_charge_ratio: BMSScalarField | None = None
bms_charge_is_charging: BMSScalarField | None = None
bms_charge_is_discharging: BMSScalarField | None = None
bms_cell_number: BMSScalarField | None = None
bms_cells_balancing: BMSMultiField | None = None
bms_temperature_sensor_num: BMSScalarField | None = None
bms_temperature_celcius: BMSMultiField | None = None
bms_voltage_cells_volts: BMSMultiField | None = None
def items(self):
"""Yield (name, field) for all non-None BMS field attributes."""
for f in dataclass_fields(self):
v = getattr(self, f.name)
if v is not None:
yield f.name, v
def serial_cleanup(ser, debug=0):
if debug > 2:
print("serial: cleaning up...")
if ser.is_open:
ser.reset_input_buffer()
ser.reset_output_buffer()
ser.close()
def initialise_serial(device, debug=0):
ser = serial.Serial(device, baudrate=9600)
ser.parity = serial.PARITY_NONE
ser.bytesize = serial.EIGHTBITS
ser.stopbits = serial.STOPBITS_ONE
ser.timeout = 1
ser.writeTimeout = 1
atexit.register(serial_cleanup, ser, debug)
return ser
def calculate_checksum(msg):
checksum = ""
return checksum
def verify_checksum(data, checksum):
# (data + length + command code) checksum, then complement, then add 1, high bit first, low bit last
# data should have start/rw stripped
s = 0
for i in data:
s += i
s = (s ^ 0xFFFF) + 1
chk = bytes_to_digits(checksum[0], checksum[1])
return s == chk
def convert_to_signed(x):
# For values below 1024, these seem to be actual results
# For values above 1024, these seem to be encoded to account for high and negative floats
max_uint = 1024
if x >= max_uint:
return (x - 2**9) % 2**10 - 2**9
else:
return x
def bytes_to_digits(high, low):
result = high
result <<= 8
result = result | low
return result
def bytes_to_date(high, low):
result = bytes_to_digits(high, low)
day = result & 0x1F
mon = (result >> 5) & 0x0F
year = 2000 + (result >> 9)
return "{:04d}-{:02d}-{:02d}".format(year, mon, day)
def requestMessage(ser, reqmsg, debug=0):
if debug > 2:
print("serial: starting up monitor")
if ser.is_open:
ser.close()
try:
ser.open()
except Exception as e:
print("serial: error open port: {0}".format(str(e)))
return False
if ser.is_open:
try:
# Resetting once alone doesn't seem to do the trick when we discarded data
# on a previous run
for i in range(2):
ser.reset_input_buffer()
ser.reset_output_buffer()
if debug > 0:
print(
"serial: write data: {0}".format(
"".join("0x{:02x} ".format(x) for x in reqmsg)
)
)
w = ser.write(reqmsg)
if debug > 2:
print("serial: bytes written: {0}".format(w))
if w != len(reqmsg):
print(
"serial ERROR: {0} bytes written, {1} expected.".format(
w, len(reqmsg)
)
)
return False
wait_time = 0
while ser.in_waiting == 0:
if wait_time > 2:
serial_cleanup(ser, debug)
return ""
if debug > 2:
print("serial: waiting for data...")
time.sleep(0.5)
wait_time += 1
if debug > 1:
print("serial: waiting reading: {0}".format(ser.in_waiting))
response = ser.read_until(b"\x77")
if len(response) == 0:
return ""
if debug > 0:
print("serial: read data: {0}".format(response.hex()))
serial_cleanup(ser, debug)
return response
except Exception as e:
print("serial: error communicating: {0}".format(str(e)))
else:
print("serial: cannot open port")
def parse_03_response(response, debug=0):
# Response is 34 bytes:
# 00 begin: \xDD
# 01 r/w: \xA5
# 02 status: \x00 = correct; \x80 = incorrect
# 03 length (usually 27)
# 04 data (size of length)
# ...
# length+4 checksum
# length+5 checksum
# length+6 end \x77
if bytes([response[0]]) != b"\xdd":
print("parse_03_response ERROR: first byte not found: {0}".format(response[0]))
return False
if bytes([response[2]]) == b"\x80":
print(
"parse_03_response ERROR: error byte returned from BMS: {0}".format(
response[2]
)
)
return False
data_len = response[3]
if debug > 2:
print("parse_03_response: data length (trimming 4 bytes): {0}".format(data_len))
# The checksum is two bytes, offset by data_len + 4
# Five bytes at the front of data: begin; rw; status, command; length
# The checksum should check command, length, and data: [3] to [3+data_len+1]
first = data_len + 4
second = data_len + 5
if second > len(response):
print("parse_03_response ERROR: primary response checksum not found")
return False
checksum = bytes([response[first], response[second]])
if not verify_checksum(response[3:first], checksum):
print("parse_03_response ERROR: failed to validate received checksum")
return False
if data_len == 0:
return False
result = JBDUPS()
vtot = bytes_to_digits(response[4], response[5]) * 0.01
result.bms_voltage_total_volts = BMSScalarField(
help="Total Voltage", raw_value=vtot, value="{:.2f}".format(vtot), units="V"
)
if debug > 1:
print(" Total voltage: {:.2f}V".format(vtot))
current = convert_to_signed(bytes_to_digits(response[6], response[7])) * 0.01
result.bms_current_amps = BMSScalarField(
help="Current", raw_value=current, value="{:.2f}".format(current), units="A"
)
if debug > 1:
print(" Current: {:.2f}A".format(current))
res_cap = bytes_to_digits(response[8], response[9]) * 0.01
nom_cap = bytes_to_digits(response[10], response[11]) * 0.01
result.bms_capacity_remaining_ah = BMSScalarField(
help="Remaining Capacity",
raw_value=res_cap,
value="{:.2f}".format(res_cap),
units="Ah",
)
result.bms_capacity_nominal_ah = BMSScalarField(
help="Nominal Capacity",
raw_value=nom_cap,
value="{:.2f}".format(nom_cap),
units="Ah",
)
if debug > 1:
print(" Remaining capacity: {:.2f}Ah".format(res_cap))
print(" Nominal capacity: {:.2f}Ah".format(nom_cap))
cycle_times = bytes_to_digits(response[12], response[13])
result.bms_charge_cycles = BMSScalarField(
help="Charge Cycles", raw_value=cycle_times, value="{0}".format(cycle_times)
)
if debug > 1:
print(" Cycle times: {0}".format(cycle_times))
man_date = bytes_to_date(response[14], response[15])
result.bms_manufacture_date = BMSInfoField(
help="Date of Manufacture", info="{0}".format(man_date)
)
if debug > 1:
print(" Manufacturing date: {0}".format(man_date))
cells = response[25]
result.bms_cell_number = BMSScalarField(
help="Cells", raw_value=cells, value="{0}".format(cells)
)
if debug > 1:
print(" Cells: {0}S".format(cells))
balance_state_high = bytes_to_digits(response[16], response[17]) # 1S to 16S
balance_state_low = bytes_to_digits(response[18], response[19]) # 17S to 32S
# 1 bit per 4S (2 bytes = 16S); in 4S, we should expect:
# 0x0 (no cells balancing) 0
# 0x1 (cell 1 balancing) 1
# 0x2 (cell 2 balancing) 2
# 0x3 (cells 1 + 2 balancing) 3
# 0x4 (cell 3 balancing) 4
# 0x5 (cells 1 + 3 balancing) 5
# 0x6 (cells 2 + 3 balancing) 6
# 0x7 (cells 1 + 2 + 3 balancing) 7
# 0x8 (cell 4 balancing) 8
# 0x9 (cells 1 + 4 balancing) 9
# 0xA (cells 2 + 4 balancing) 10
# 0xB (cells 1 + 2 + 4 balancing) 11
# 0xC (cells 3 + 4 balancing) 12
# 0xD (cells 1 + 3 + 4 balancing) 13
# 0xE (cells 2 + 3 + 4 balancing) 14
# 0xF (cells 1 + 2 + 3 + 4 balancing) 15
raw_balancing = {}
str_balancing = {}
for cell in range(cells):
# Cells 1-16 are recorded in balance_state_low, 17-32 in balance_state_high;
# hilo_cell records the offset relative to the state group
if cell >= 16:
state = balance_state_high
hilo_cell = cell - 16
else:
state = balance_state_low
hilo_cell = cell
# Cells are recorded as groups of 4 bits (0x0-0xF) per 4 cells
g = int(hilo_cell / 4)
b = 2 ** (hilo_cell - (g * 4))
balancing = bool((state >> g) & b)
raw_balancing[cell + 1] = balancing
str_balancing[cell + 1] = "{0}".format(int(balancing))
if debug > 1:
print(" Balancing cell {0}: {1}".format(cell, balancing))
result.bms_cells_balancing = BMSMultiField(
help="Cells balancing",
label="cell",
raw_values=raw_balancing,
values=str_balancing,
)
protection_state = bytes_to_digits(response[20], response[21])
sop = protection_state & 1
sup = protection_state & 2
gop = protection_state & 4
gup = protection_state & 8
cotp = protection_state & 16
cutp = protection_state & 32
dotp = protection_state & 64
dutp = protection_state & 128
cocp = protection_state & 256
docp = protection_state & 512
scp = protection_state & 1024
fdic = protection_state & 2048
slm = protection_state & 4096
def _prot(help, flag):
return BMSScalarField(
help=help, raw_value=bool(flag), value="{0}".format(int(bool(flag)))
)
result.bms_protection_sop_bool = _prot("Single overvoltage protection", sop)
result.bms_protection_sup_bool = _prot("Single undervoltage protection", sup)
result.bms_protection_wgop_bool = _prot("Whole group overvoltage protection", gop)
result.bms_protection_wgup_bool = _prot("Whole group undervoltage protection", gup)
result.bms_protection_cotp_bool = _prot(
"Charging over-temperature protection", cotp
)
result.bms_protection_cutp_bool = _prot(
"Charging under-temperature protection", cutp
)
result.bms_protection_dotp_bool = _prot(
"Discharging over-temperature protection", dotp
)
result.bms_protection_dutp_bool = _prot("Discharging under-protection", dutp)
result.bms_protection_cocp_bool = _prot("Charging over-current protection", cocp)
result.bms_protection_docp_bool = _prot("Discharging over-current protection", docp)
result.bms_protection_scp_bool = _prot("Short-circuit protection", scp)
result.bms_protection_fdic_bool = _prot("Front detection IC error", fdic)
result.bms_protection_slmos_bool = _prot("Software lock MOS", slm)
if debug > 2:
print(" Protection state: {0}".format(protection_state))
for attr in (
"sop",
"sup",
"gop",
"gup",
"cotp",
"cutp",
"dotp",
"dutp",
"cocp",
"docp",
"scp",
"fdic",
"slm",
):
val = locals()[attr]
print(" {}: {}".format(attr, bool(val)))
rsoc = response[23] * 0.01
result.bms_capacity_charge_ratio = BMSScalarField(
help="Percent Charge", raw_value=rsoc, value="{0}".format(rsoc), units=""
)
if debug > 1:
print(" Capacity remaining: {0}%".format(rsoc * 100))
# bit0 = charging; bit1 = discharging; 0 = MOS closing; 1 = MOS opening
control_status = response[24]
result.bms_charge_is_charging = BMSScalarField(
help="MOSFET charging",
raw_value=bool(control_status & 1),
value="{0}".format(int(bool(control_status & 1))),
)
result.bms_charge_is_discharging = BMSScalarField(
help="MOSFET discharging",
raw_value=bool((control_status >> 1) & 1),
value="{0}".format(int(bool((control_status >> 1) & 1))),
)
if debug > 1:
print(" MOSFET charging: {0}".format("yes" if (control_status & 1) else "no"))
print(
" MOSFET discharging: {0}".format(
"yes" if ((control_status >> 1) & 1) else "no"
)
)
ntc_num = response[26]
temperatures = [
(bytes_to_digits(response[27 + (2 * i)], response[28 + (2 * i)]) - 2731) * 0.1
for i in range(ntc_num)
]
result.bms_temperature_sensor_num = BMSScalarField(
help="Temperature Sensors", raw_value=ntc_num, value="{0}".format(ntc_num)
)
result.bms_temperature_celcius = BMSMultiField(
help="Temperature",
label="sensor",
raw_values={i + 1: temp for i, temp in enumerate(temperatures)},
values={i + 1: "{:.2f}".format(temp) for i, temp in enumerate(temperatures)},
units="°C",
)
if debug > 1:
print(" Number of temperature sensors: {0}".format(ntc_num))
for i, temp in enumerate(temperatures):
print(" Temperature sensor {:d}: {:.2f}°C".format(i + 1, temp))
return result
def parse_04_response(response, debug=0):
# Response is 7 + cells * 2 bytes:
# 00 begin: \xDD
# 01 r/w: \xA5
# 02 status: \x00 = correct; \x80 = incorrect
# 03 length (usually 8)
# 04 data (size of length)
# ...
# length+4 checksum
# length+5 checksum
# length+6 end \x77
if bytes([response[0]]) != b"\xdd":
print("parse_04_response ERROR: first byte not found: {0}".format(response[0]))
return False
if bytes([response[2]]) == b"\x80":
print(
"parse_04_response ERROR: error byte returned from BMS: {0}".format(
response[2]
)
)
return False
data_len = response[3]
if debug > 2:
print(" Data length (trimming 4 bytes): {0}".format(data_len))
# The checksum is two bytes, offset by data_len + 4
# Five bytes at the front of data: begin; rw; status, command; length
# The checksum should check command, length, and data: [3] to [3+data_len+1]
first = data_len + 4
second = data_len + 5
if second > len(response):
print("parse_04_response ERROR: cell voltage checksum not found")
return False
checksum = bytes([response[first], response[second]])
if not verify_checksum(response[3:first], checksum):
print("parse_04_response ERROR: failed to validate received checksum")
return False
if data_len == 0:
return False
raw_values = {}
str_values = {}
for cell in range(int(data_len / 2)):
lo = (cell * 2) + 4
hi = (cell * 2) + 5
cellv = bytes_to_digits(response[lo], response[hi]) * 0.001
raw_values[cell + 1] = cellv
str_values[cell + 1] = "{:.3f}".format(cellv)
if debug > 1:
print(" Cell {:.0f}: {:.3f}V".format(cell + 1, cellv))
return BMSMultiField(
help="Cell Voltages",
label="cell",
raw_values=raw_values,
values=str_values,
units="V",
)
def collect_data(ser, debug=0):
# Request is 7 bytes:
# \xDD for start
# \xA5 for read, \x5A for write
# \x03 for regular info; \x04 for individual voltages
# \x77 ends
reqmsg = bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
response_03 = requestMessage(ser, reqmsg, debug)
if len(response_03) == 0:
if debug > 0:
print("collect_data: Error retrieving BMS info. Trying again...")
return False
response_03 = bytearray(response_03)
reqmsg = bytearray([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77])
response_04 = requestMessage(ser, reqmsg, debug)
if len(response_04) == 0:
if debug > 0:
print("collect_data: Error retrieving BMS info. Trying again...")
return False
response_04 = bytearray(response_04)
result = parse_03_response(response_03, debug)
if result is False:
return False
cell_field = parse_04_response(response_04, debug)
if cell_field is False:
return False
result.bms_voltage_cells_volts = cell_field
return result
+128 -558
View File
@@ -1,19 +1,20 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# #
# Communicate with a JBD/SZLLT BMS and return basic information # Daemon: listens on a Unix socket and serves JBD BMS data to clients
# in order to shutdown equipment when voltage levels drop or remaining
# capacity gets low
# #
import os, sys, stat, time import os
import sys
import stat
import time
import atexit
import signal
import json import json
import atexit, signal import struct
import serial, serial.rs485 from dataclasses import asdict as dataclass_asdict
import struct, json
import pprint
connected_clients = list() from bmspy.jbd_ups import collect_data, initialise_serial
current_data = dict()
# Expected kernel log output when the USB-serial adapter is plugged in:
# usb 1-1.4: new full-speed USB device number 4 using xhci_hcd # usb 1-1.4: new full-speed USB device number 4 using xhci_hcd
# usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00 # usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00
# usb 1-1.4: New USB device strings: Mfr=1, Product=2, SerialNumber=3 # usb 1-1.4: New USB device strings: Mfr=1, Product=2, SerialNumber=3
@@ -27,514 +28,28 @@ current_data = dict()
# ftdi_sio 1-1.4:1.0: FTDI USB Serial Device converter detected # ftdi_sio 1-1.4:1.0: FTDI USB Serial Device converter detected
# usb 1-1.4: Detected FT232RL # usb 1-1.4: Detected FT232RL
# usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0 # usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0
# usb 1-1.4: usbfs: interface 0 claimed by ftdi_sio while 'python3' sets config #1
connected_clients = list()
current_data = None
# Catch systemd signals
def signalHandler(): def signalHandler():
raise SystemExit('terminating') raise SystemExit("terminating")
''' Clean up socket '''
def socket_cleanup(socket_path, debug=0): def socket_cleanup(socket_path, debug=0):
os.unlink(socket_path) os.unlink(socket_path)
''' Clean up serial port '''
def serial_cleanup(ser, debug=0):
if debug > 2:
print("serial: cleaning up...")
if ser.is_open:
ser.reset_input_buffer() # flush input buffer, discarding all its contents
ser.reset_output_buffer() # flush output buffer, aborting current output
ser.close()
def initialise_serial(device, debug=0):
# TODO: ensure ttyUSB0 points to idVendor=0403, idProduct=6001
# with serial.tools.list_ports.ListPortInfo
# python3 -m serial.tools.list_ports USB
ser = serial.Serial(device, baudrate=9600)
ser.parity = serial.PARITY_NONE # set parity check: no parity
ser.bytesize = serial.EIGHTBITS # number of bits per bytes
ser.stopbits = serial.STOPBITS_ONE # number of stop bits
ser.timeout = 1 # timeout block read
ser.writeTimeout = 1 # timeout for write
atexit.register(serial_cleanup, ser, debug)
return ser
def calculate_checksum(msg):
checksum = ''
return checksum
def verify_checksum(data, checksum):
# (data + length + command code) checksum, then complement, then add 1, high bit first, low bit last
# data should have start/rw stripped
s = 0
for i in data:
s += i
s = (s ^ 0xFFFF) + 1
chk = bytes_to_digits(checksum[0], checksum[1])
return s == chk
def convert_to_signed(x):
# For values below 1024, these seem to be actual results
# For values above 1024, these seem to be encoded to account for high and negative floats
max_uint = 1024
if x >= max_uint:
return (x - 2**9) % 2**10 - 2**9
else:
return x
def bytes_to_digits(high, low):
result = high
result <<= 8
result = result | low
return result
def bytes_to_date(high, low):
result= bytes_to_digits(high, low)
day = result & 0x1f
mon = (result >> 5) & 0x0f
year = 2000 + (result >> 9)
return "{:04d}-{:02d}-{:02d}".format(year, mon, day)
# takes a serial object and a message, returns a response
def requestMessage(ser, reqmsg, debug=0):
if debug > 2:
print('serial: starting up monitor')
if ser.is_open:
ser.close()
try:
ser.open()
except Exception as e:
print("serial: error open port: {0}".format(str(e)))
return False
if ser.is_open:
try:
# Resetting once alone doesn't seem to do the trick when we discarded data
# on a previous run
for i in range(2):
ser.reset_input_buffer() # flush input buffer, discarding all its contents
ser.reset_output_buffer() # flush output buffer, aborting current output
if debug > 0:
print("serial: write data: {0}".format("".join('0x{:02x} '.format(x) for x in reqmsg)))
w = ser.write(reqmsg)
if debug > 2:
print("serial: bytes written: {0}".format(w))
#time.sleep(1)
if w != len(reqmsg):
print("serial ERROR: {0} bytes written, {1} expected.".format(w, len(reqmsg)))
return False
wait_time = 0
while ser.in_waiting == 0:
# Return an empty string if we end up waiting too long
if wait_time > 2:
serial_cleanup(ser, debug)
return ''
if debug > 2:
print("serial: waiting for data...")
time.sleep(0.5)
wait_time += 1
if debug > 1:
print("serial: waiting reading: {0}".format(ser.in_waiting))
response = ser.read_until(b'\x77')
# Return an empty string if the read timed out or returned nothing
if len(response) == 0:
return ''
if debug > 0:
print("serial: read data: {0}".format(response.hex()))
serial_cleanup(ser, debug)
return response
except Exception as e:
print("serial: error communicating: {0}".format(str(e)))
else:
print("serial: cannot open port")
def parse_03_response(response, debug=0):
data = dict()
# Response is 34 bytes:
# 00 begin: \xDD
# 01 r/w: \xA5
# 02 status: \x00 = correct; \x80 = incorrect
# 03 length (usually 27)
# 04 data (size of length)
# ...
# length+4 checksum
# length+5 checksum
# length+6 end \x77
if bytes([response[0]]) != b'\xdd':
print("parse_03_response ERROR: first byte not found: {0}".format(response[0]))
return False
if bytes([response[2]]) == b'\x80':
print("parse_03_response ERROR: error byte returned from BMS: {0}".format(response[2]))
return False
data_len = response[3]
if debug > 2:
print("parse_03_response: data length (trimming 4 bytes): {0}".format(data_len))
# The checksum is two bytes, offset by data_len + 4
# Five bytes at the front of data
# begin; rw; status, command; length
# The checksum should check command, length, and data: [3] to [3+data_len+1]
first = data_len + 4
second = data_len + 5
if second > len(response):
print("parse_03_response ERROR: primary response checksum not found")
return False;
checksum = bytes([response[first], response[second]])
if not verify_checksum(response[3:first], checksum):
print("parse_03_response ERROR: failed to validate received checksum")
return False
if data_len > 0:
vtot = bytes_to_digits(response[4], response[5]) * 0.01
data['bms_voltage_total_volts'] = dict()
data['bms_voltage_total_volts']['help'] = "Total Voltage"
data['bms_voltage_total_volts']['raw_value'] = vtot
data['bms_voltage_total_volts']['value'] = "{:.2f}".format(vtot)
data['bms_voltage_total_volts']['units'] = "V"
if debug > 1:
print(" Total voltage: {:.2f}V".format(vtot))
current = bytes_to_digits(response[6], response[7])
current = convert_to_signed(current) * 0.01
data['bms_current_amps'] = dict()
data['bms_current_amps']['help'] = "Current"
data['bms_current_amps']['raw_value'] = current
data['bms_current_amps']['value'] = "{:.2f}".format(current)
data['bms_current_amps']['units'] = "A"
if debug > 1:
print(" Current: {:.2f}A".format(current))
res_cap = bytes_to_digits(response[8], response[9]) * 0.01
nom_cap = bytes_to_digits(response[10], response[11]) * 0.01
data['bms_capacity_remaining_ah'] = dict()
data['bms_capacity_remaining_ah']['help'] = "Remaining Capacity"
data['bms_capacity_remaining_ah']['raw_value'] = res_cap
data['bms_capacity_remaining_ah']['value'] = "{:.2f}".format(res_cap)
data['bms_capacity_remaining_ah']['units'] = "Ah"
data['bms_capacity_nominal_ah'] = dict()
data['bms_capacity_nominal_ah']['help'] = "Nominal Capacity"
data['bms_capacity_nominal_ah']['raw_value'] = nom_cap
data['bms_capacity_nominal_ah']['value'] = "{:.2f}".format(nom_cap)
data['bms_capacity_nominal_ah']['units'] = "Ah"
if debug > 1:
print(" Remaining capacity: {:.2f}Ah".format(res_cap))
print(" Nominal capacity: {:.2f}Ah".format(nom_cap))
cycle_times = bytes_to_digits(response[12], response[13])
data['bms_charge_cycles'] = dict()
data['bms_charge_cycles']['help'] = "Charge Cycles"
data['bms_charge_cycles']['raw_value'] = cycle_times
data['bms_charge_cycles']['value'] = "{0}".format(cycle_times)
if debug > 1:
print(" Cycle times: {0}".format(cycle_times))
man_date = bytes_to_date(response[14], response[15])
data['bms_manufacture_date'] = dict()
data['bms_manufacture_date']['help'] = "Date of Manufacture"
data['bms_manufacture_date']['info'] = "{0}".format(man_date)
if debug > 1:
print(" Manufacturing date: {0}".format(man_date))
cells = response[25] # 4S
data['bms_cell_number'] = dict()
data['bms_cell_number']['help'] = "Cells"
data['bms_cell_number']['raw_value'] = cells
data['bms_cell_number']['value'] = "{0}".format(cells)
if debug > 1:
print(" Cells: {0}S".format(cells))
balance_state_high = bytes_to_digits(response[16], response[17]) # 1S to 16S
balance_state_low = bytes_to_digits(response[18], response[19]) # 17S to 32S
# 1 bit per 4S (2 bytes = 16S); in 4S, we should expect:
# 0x0 (no cells balancing) 0
# 0x1 (cell 1 balancing) 1
# 0x2 (cell 2 balancing) 2
# 0x3 (cells 1 + 2 balancing) 3
# 0x4 (cell 3 balancing) 4
# 0x5 (cells 1 + 3 balancing) 5
# 0x6 (cells 2 + 3 balancing) 6
# 0x7 (cells 1 + 2 + 3 balancing) 7
# 0x8 (cell 4 balancing) 8
# 0x9 (cells 1 + 4 balancing) 9
# 0xA (cells 2 + 4 balancing) 10
# 0xB (cells 1 + 2 + 4 balancing) 11
# 0xC (cells 3 + 4 balancing) 12
# 0xD (cells 1 + 3 + 4 balancing) 13
# 0xE (cells 2 + 3 + 4 balancing) 14
# 0xF (cells 1 + 2 + 3 + 4 balancing) 15
#data["Balancing"] = dict()
data['bms_cells_balancing'] = dict()
data['bms_cells_balancing']['help'] = "Cells balancing"
data['bms_cells_balancing']['label'] = 'cell'
data['bms_cells_balancing']['raw_values'] = dict()
data['bms_cells_balancing']['values'] = dict()
for cell in range(cells):
# Cells from 1 to 16 are recorded in balance_state_low,
# and from 17 to 32 in balance_state_high; hilo_cell records the offset
# relative to the state group
if cell >= 16:
state = balance_state_high
hilo_cell = cell - 16
else:
state = balance_state_low
hilo_cell = cell
# Cells are recorded as groups of 4 bits (0x0-0xF) per 4 cells
g = int(hilo_cell / 4)
b = 2**(hilo_cell - (g * 4 ))
data['bms_cells_balancing']['raw_values'][cell+1] = bool((state >> g) & b)
data['bms_cells_balancing']['values'][cell+1] = "{0}".format(int(bool((state >> g) & b)))
if debug > 1:
print(" Balancing cell {0}: {1}".format(cell, bool((state >> g & b))))
protection_state = bytes_to_digits(response[20], response[21])
sop = protection_state & 1
sup = protection_state & 2
gop = protection_state & 4
gup = protection_state & 8
cotp = protection_state & 16
cutp = protection_state & 32
dotp = protection_state & 64
dutp = protection_state & 128
cocp = protection_state & 256
docp = protection_state & 512
scp = protection_state & 1024
fdic = protection_state & 2048
slm = protection_state & 4096
data['bms_protection_sop_bool'] = dict()
data['bms_protection_sop_bool']['help'] = "Single overvoltage protection"
data['bms_protection_sop_bool']['raw_value'] = bool(sop)
data['bms_protection_sop_bool']['value'] = "{0}".format(int(bool(sop)))
data['bms_protection_sup_bool'] = dict()
data['bms_protection_sup_bool']['help'] = "Single undervoltage protection"
data['bms_protection_sup_bool']['raw_value'] = bool(sup)
data['bms_protection_sup_bool']['value'] = "{0}".format(int(bool(sup)))
data['bms_protection_wgop_bool'] = dict()
data['bms_protection_wgop_bool']['help'] = "Whole group overvoltage protection"
data['bms_protection_wgop_bool']['raw_value'] = bool(gop)
data['bms_protection_wgop_bool']['value'] = "{0}".format(int(bool(gop)))
data['bms_protection_wgup_bool'] = dict()
data['bms_protection_wgup_bool']['help'] = "Whole group undervoltage protection"
data['bms_protection_wgup_bool']['raw_value'] = bool(gup)
data['bms_protection_wgup_bool']['value'] = "{0}".format(int(bool(gup)))
data['bms_protection_cotp_bool'] = dict()
data['bms_protection_cotp_bool']['help'] = "Charging over-temperature protection"
data['bms_protection_cotp_bool']['raw_value'] = bool(cotp)
data['bms_protection_cotp_bool']['value'] = "{0}".format(int(bool(cotp)))
data['bms_protection_cutp_bool'] = dict()
data['bms_protection_cutp_bool']['help'] = "Charging under-temperature protection"
data['bms_protection_cutp_bool']['raw_value'] = bool(cutp)
data['bms_protection_cutp_bool']['value'] = "{0}".format(int(bool(cutp)))
data['bms_protection_dotp_bool'] = dict()
data['bms_protection_dotp_bool']['help'] = "Discharging over-temperature protection"
data['bms_protection_dotp_bool']['raw_value'] = bool(dotp)
data['bms_protection_dotp_bool']['value'] = "{0}".format(int(bool(dotp)))
data['bms_protection_dutp_bool'] = dict()
data['bms_protection_dutp_bool']['help'] = "Discharging under-protection"
data['bms_protection_dutp_bool']['raw_value'] = bool(dutp)
data['bms_protection_dutp_bool']['value'] = "{0}".format(int(bool(dutp)))
data['bms_protection_cocp_bool'] = dict()
data['bms_protection_cocp_bool']['help'] = "Charging over-current protection"
data['bms_protection_cocp_bool']['raw_value'] = bool(cocp)
data['bms_protection_cocp_bool']['value'] = "{0}".format(int(bool(cocp)))
data['bms_protection_docp_bool'] = dict()
data['bms_protection_docp_bool']['help'] = "Discharging over-current protection"
data['bms_protection_docp_bool']['raw_value'] = bool(docp)
data['bms_protection_docp_bool']['value'] = "{0}".format(int(bool(docp)))
data['bms_protection_scp_bool'] = dict()
data['bms_protection_scp_bool']['help'] = "Short-circuit protection"
data['bms_protection_scp_bool']['raw_value'] = bool(scp)
data['bms_protection_scp_bool']['value'] = "{0}".format(int(bool(scp)))
data['bms_protection_fdic_bool'] = dict()
data['bms_protection_fdic_bool']['help'] = "Front detection IC error"
data['bms_protection_fdic_bool']['raw_value'] = bool(fdic)
data['bms_protection_fdic_bool']['value'] = "{0}".format(int(bool(fdic)))
data['bms_protection_slmos_bool'] = dict()
data['bms_protection_slmos_bool']['help'] = "Software lock MOS"
data['bms_protection_slmos_bool']['raw_value'] = bool(slm)
data['bms_protection_slmos_bool']['value'] = "{0}".format(int(bool(slm)))
if debug > 2:
print(" Protection state: {0}".format(protection_state))
print(" Single overvoltage protection: {0}".format(bool(sop)))
print(" Single undervoltage protection: {0}".format(bool(sup)))
print(" Whole group overvoltage protection: {0}".format(bool(gop)))
print(" Whole group undervoltage protection: {0}".format(bool(gup)))
print(" Charging over-temperature protection: {0}".format(bool(cotp)))
print(" Charging under-temperature protection: {0}".format(bool(cutp)))
print(" Discharging over-temperature protection: {0}".format(bool(dotp)))
print(" Discharging under-protection: {0}".format(bool(dutp)))
print(" Charging over-current protection: {0}".format(bool(cocp)))
print(" Discharging over-current protection: {0}".format(bool(docp)))
print(" Short-circuit protection: {0}".format(bool(scp)))
print(" Front detection IC error: {0}".format(bool(fdic)))
print(" Software lock MOS: {0}".format(bool(slm)))
software_version = bytes([response[22]])
# percent of capacity remaining, converted to a per mille ratio between 0 and 1
rsoc = response[23] * 0.01
data['bms_capacity_charge_ratio'] = dict()
data['bms_capacity_charge_ratio']['help'] = "Percent Charge"
data['bms_capacity_charge_ratio']['raw_value'] = rsoc
data['bms_capacity_charge_ratio']['value'] = "{0}".format(rsoc)
data['bms_capacity_charge_ratio']['units'] = "\u2030"
if debug > 1:
print(" Capacity remaining: {0}%".format(rsoc * 100))
# bit0 = charging; bit1 = discharging; 0 = MOS closing; 1 = MOS opening
control_status = response[24]
data['bms_charge_is_charging'] = dict()
data['bms_charge_is_charging']['help'] = "MOSFET charging"
data['bms_charge_is_charging']['raw_value'] = bool(control_status & 1)
data['bms_charge_is_charging']['value'] = "{0}".format(int(bool(control_status & 1)))
data['bms_charge_is_discharging'] = dict()
data['bms_charge_is_discharging']['help'] = "MOSFET discharging"
data['bms_charge_is_discharging']['raw_value'] = bool(control_status & 1)
data['bms_charge_is_discharging']['value'] = "{0}".format(int(bool(control_status & 1)))
if debug > 1:
if (control_status & 1):
print(" MOSFET charging: yes")
else:
print(" MOSFET charging: no")
if ((control_status >> 1) & 1):
print(" MOSFET discharging: yes")
else:
print(" MOSFET discharging: no")
ntc_num = response[26] # number of temperature sensors
ntc_content = bytearray() # 2 * ntc_num in size
temperatures = list()
for i in range(ntc_num):
temperatures.append((bytes_to_digits(response[27+(2*i)], response[28+(2*i)]) - 2731) * 0.1)
data['bms_temperature_sensor_num'] = dict()
data['bms_temperature_sensor_num']['help'] = "Temperature Sensors"
data['bms_temperature_sensor_num']['raw_value'] = ntc_num
data['bms_temperature_sensor_num']['value'] = "{0}".format(ntc_num)
data['bms_temperature_celcius'] = dict()
data['bms_temperature_celcius']['help'] = "Temperature"
data['bms_temperature_celcius']['units'] = "\u00B0C"
data['bms_temperature_celcius']['label'] = 'sensor'
data['bms_temperature_celcius']['raw_values'] = dict()
data['bms_temperature_celcius']['values'] = dict()
for i, temp in enumerate(temperatures):
data['bms_temperature_celcius']['raw_values'][i+1] = temp
data['bms_temperature_celcius']['values'][i+1] = "{:.2f}".format(temp)
if debug > 1:
print(" Number of temperature sensors: {0}".format(ntc_num))
for i, temp in enumerate(temperatures):
print(u" Temperature sensor {:d}: {:.2f}\u00B0C".format(i+1, temp))
return data
def parse_04_response(response, debug=0):
data = dict()
# Response is 7 + cells * 2 bytes:
# 00 begin: \xDD
# 01 r/w: \xA5
# 02 status: \x00 = correct; \x80 = incorrect
# 03 length (usually 8)
# 04 data (size of length)
# ...
# length+4 checksum
# length+5 checksum
# length+6 end \x77
if bytes([response[0]]) != b'\xdd':
print("parse_04_response ERROR: first byte not found: {0}".format(response[0]))
return False
if bytes([response[2]]) == b'\x80':
print("parse_04_response ERROR: error byte returned from BMS: {0}".format(response[2]))
return False
data_len = response[3]
if debug > 2:
print(" Data length (trimming 4 bytes): {0}".format(data_len))
# The checksum is two bytes, offset by data_len + 4
# Five bytes at the front of data
# begin; rw; status, command; length
# The checksum should check command, length, and data: [3] to [3+data_len+1]
first = data_len + 4
second = data_len + 5
if second > len(response):
print("parse_04_response ERROR: cell voltage checksum not found")
return False
checksum = bytes([response[first], response[second]])
if not verify_checksum(response[3:first], checksum):
print("parse_04_response ERROR: failed to validate received checksum")
return False
if data_len > 0:
data['bms_voltage_cells_volts'] = dict()
data['bms_voltage_cells_volts']['help'] = "Cell Voltages"
data['bms_voltage_cells_volts']['units'] = "V"
data['bms_voltage_cells_volts']['label'] = "cell"
data['bms_voltage_cells_volts']['raw_values'] = dict()
data['bms_voltage_cells_volts']['values'] = dict()
for cell in range(int(data_len / 2)):
first = (cell * 2) + 4
second = (cell * 2) + 5
cellv = bytes_to_digits(response[first], response[second]) * 0.001
data['bms_voltage_cells_volts']['raw_values'][cell+1] = cellv
data['bms_voltage_cells_volts']['values'][cell+1] = "{:.3f}".format(cellv)
if debug > 1:
print(" Cell {:.0f}: {:.3f}V".format(cell+1, cellv))
return data
def collect_data(ser, debug=0):
# Request is 7 bytes:
# \xDD for start
# \xA5 for read, \x5A for write
# \x03 for regular info; \x04 for individual voltages
# \x77 ends
data = dict()
reqmsg = bytearray([ 0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77 ])
response_03 = requestMessage(ser, reqmsg, debug)
if len(response_03) == 0:
if debug > 0:
print("collect_data: Error retrieving BMS info. Trying again...")
return False
response_03 = bytearray(response_03)
reqmsg = bytearray([ 0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77 ])
response_04 = requestMessage(ser, reqmsg, debug)
if len(response_04) == 0:
if debug > 0:
print("collect_data: Error retrieving BMS info. Trying again...")
return False
response_04 = bytearray(response_04)
parsed_03 = parse_03_response(response_03, debug)
if parsed_03 is not False:
data.update(parsed_03)
else:
return False
parsed_04 = parse_04_response(response_04, debug)
if parsed_04 is not False:
data.update(parsed_04)
else:
return False
return data
def read_request(connection, debug=0): def read_request(connection, debug=0):
# get length of expected json string # get length of expected json string
request = bytes() request = bytes()
try: try:
request = connection.recv(struct.calcsize('!I')) request = connection.recv(struct.calcsize("!I"))
except Exception as e: except Exception as e:
raise OSError("unable to read request length from socket: {}".format(e)) raise OSError("unable to read request length from socket: {}".format(e))
try: try:
length = struct.unpack('!I', request)[0] length = struct.unpack("!I", request)[0]
except Exception as e: except Exception as e:
raise Exception("unable to determine request length: {}".format(e)) raise Exception("unable to determine request length: {}".format(e))
if debug > 4: if debug > 4:
@@ -552,25 +67,31 @@ def read_request(connection, debug=0):
except Exception as e: except Exception as e:
raise Exception("unable to read incoming request: {}".format(e)) raise Exception("unable to read incoming request: {}".format(e))
if debug > 2: if debug > 2:
print('socket: received {!r}'.format(request_data)) print("socket: received {!r}".format(request_data))
return request_data return request_data
def send_response(connection, response_data, debug=0): def send_response(connection, response_data, debug=0):
try: try:
client = response_data['client'] client = response_data.client
except: except AttributeError:
client = "unknown client" client = response_data.get("client", "unknown client")
if debug > 2: if debug > 2:
print('socket: sending {!r}'.format(response_data)) print("socket: sending {!r}".format(response_data))
try: try:
response = json.dumps(response_data).encode() response = json.dumps(response_data).encode()
response = json.dumps(
response_data,
default=lambda o: {k: dataclass_asdict(v) for k, v in o.items()}
if hasattr(o, "items") and not isinstance(o, dict)
else str(o),
).encode()
# add length to the start of the json string, so we know how much to read on the other end # add length to the start of the json string, so we know how much to read on the other end
length = struct.pack('!I', len(response)) length = struct.pack("!I", len(response))
if debug > 4: if debug > 4:
print('socket: sending {} data of length: {}'.format(client, length)) print("socket: sending {} data of length: {}".format(client, length))
response = length + response response = length + response
if debug > 3: if debug > 3:
print("socket: outgoing response: {}".format(response)) print("socket: outgoing response: {}".format(response))
@@ -581,8 +102,9 @@ def send_response(connection, response_data, debug=0):
def main(): def main():
import argparse import argparse
import socket, socketserver import socket
import pwd, grp import pwd
import grp
signal.signal(signal.SIGTERM, signalHandler) signal.signal(signal.SIGTERM, signalHandler)
@@ -590,29 +112,57 @@ def main():
timestamp = 0 timestamp = 0
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Query JBD BMS and report status', description="Query JBD BMS and report status",
add_help=True, add_help=True,
)
parser.add_argument(
"--device",
"-d",
dest="device",
action="store",
default="/dev/ttyUSB0",
help="USB device to read",
)
parser.add_argument(
"--socket",
"-s",
dest="socket",
action="store",
default="/run/bmspy/bms",
help="Socket to communicate with daemon",
)
parser.add_argument(
"--user",
"-u",
dest="uid_name",
action="store",
default="nobody",
help="Run daemon as user",
)
parser.add_argument(
"--group",
"-g",
dest="gid_name",
action="store",
default="dialout",
help="Run daemon as group",
)
parser.add_argument(
"--verbose",
"-v",
action="count",
default=0,
help="Print more verbose information (can be specified multiple times)",
) )
parser.add_argument('--device', '-d', dest='device', action='store',
default='/dev/ttyUSB0', help='USB device to read')
parser.add_argument('--socket', '-s', dest='socket', action='store',
default='/run/bmspy/bms', help='Socket to communicate with daemon')
parser.add_argument('--user', '-u', dest='uid_name', action='store',
default='nobody', help='Run daemon as user')
parser.add_argument('--group', '-g', dest='gid_name', action='store',
default='dialout', help='Run daemon as group')
parser.add_argument('--verbose', '-v', action='count',
default=0, help='Print more verbose information (can be specified multiple times)')
args = parser.parse_args() args = parser.parse_args()
debug=args.verbose debug = args.verbose
if debug > 0: if debug > 0:
print("Running BMS query daemon on socket {}".format(args.socket)) print("Running BMS query daemon on socket {}".format(args.socket))
ser = initialise_serial(args.device) ser = initialise_serial(args.device)
# Create any necessary directories for the socket
socket_dir = os.path.dirname(args.socket) socket_dir = os.path.dirname(args.socket)
socket_dir_created = False socket_dir_created = False
if not os.path.isdir(socket_dir): if not os.path.isdir(socket_dir):
@@ -625,48 +175,58 @@ def main():
running_uid = pwd.getpwnam(args.uid_name)[2] running_uid = pwd.getpwnam(args.uid_name)[2]
running_gid = grp.getgrnam(args.gid_name)[2] running_gid = grp.getgrnam(args.gid_name)[2]
# If we've created a new directory for the socket, ensure that
# the highest-level directory has the correct permissions
if socket_dir_created: if socket_dir_created:
os.chown(socket_dir, running_uid, running_gid) os.chown(socket_dir, running_uid, running_gid)
os.chmod(socket_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) os.chmod(
socket_dir,
stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IXGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IXOTH,
)
new_umask = 0o003 new_umask = 0o003
old_umask = os.umask(new_umask) old_umask = os.umask(new_umask)
if debug > 1: if debug > 1:
print('socket: old umask: %s, new umask: %s' % \ print(
(oct(old_umask), oct(new_umask))) "socket: old umask: %s, new umask: %s"
% (oct(old_umask), oct(new_umask))
)
# Try setting the new uid/gid
try: try:
os.setgid(running_gid) os.setgid(running_gid)
except OSError as e: except OSError as e:
print('could not set effective group id: {}'.format(e)) print("could not set effective group id: {}".format(e))
try: try:
os.setuid(running_uid) os.setuid(running_uid)
except OSError as e: except OSError as e:
print('could not set effective user id: {}'.format(e)) print("could not set effective user id: {}".format(e))
final_uid = os.getuid() final_uid = os.getuid()
final_gid = os.getgid() final_gid = os.getgid()
if debug > 0: if debug > 0:
print('socket: running as {}:{}'.format(pwd.getpwuid(final_uid)[0], grp.getgrgid(final_gid)[0])) print(
"socket: running as {}:{}".format(
pwd.getpwuid(final_uid)[0], grp.getgrgid(final_gid)[0]
)
)
# Make sure the socket does not exist
if os.path.exists(args.socket): if os.path.exists(args.socket):
raise OSError("socket {} already exists; exiting...".format(args.socket)) raise OSError("socket {} already exists; exiting...".format(args.socket))
# Create socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Bind the socket to the port
if debug > 2: if debug > 2:
print('starting up on {}'.format(args.socket)) print("starting up on {}".format(args.socket))
sock.bind(args.socket) sock.bind(args.socket)
atexit.register(socket_cleanup, args.socket, debug) atexit.register(socket_cleanup, args.socket, debug)
# Listen for incoming connections
sock.listen(1) sock.listen(1)
while True: while True:
@@ -674,9 +234,8 @@ def main():
client_address = None client_address = None
try: try:
# Wait for a connection
if debug > 2: if debug > 2:
print('socket: waiting for a connection') print("socket: waiting for a connection")
connection, client_address = sock.accept() connection, client_address = sock.accept()
request_data = dict() request_data = dict()
@@ -686,41 +245,52 @@ def main():
print("socket ERROR: {}".format(e)) print("socket ERROR: {}".format(e))
continue continue
client = request_data['client'] or 'unknown' client = request_data["client"] or "unknown"
match request_data['command']: match request_data["command"]:
case 'REGISTER': case "REGISTER":
connected_clients.append(client) connected_clients.append(client)
send_response(
connection, {"status": "REGISTERED", "client": client}, debug
)
send_response(connection, {'status': 'REGISTERED', 'client': client}, debug) case "DEREGISTER":
case 'DEREGISTER':
try: try:
connected_clients.remove(client) connected_clients.remove(client)
except: except Exception:
pass pass
send_response(
connection, {"status": "DEREGISTERED", "client": client}, debug
)
send_response(connection, {'status': 'DEREGISTERED', 'client': client}, debug) send_response(
connection, {"status": "DEREGISTERED", "client": client}, debug
)
case 'GET': case "GET":
timestamp = 0 timestamp = 0
if bool(current_data) is True: if bool(current_data) is True:
timestamp = current_data.get('timestamp', 0) timestamp = current_data.get("timestamp", 0)
print("reading data, current timestamp is {}, time is {}".format(timestamp, time.time())) print(
"reading data, current timestamp is {}, time is {}".format(
timestamp, time.time()
)
)
# only get new data five seconds after the last read # only get new data five seconds after the last read
if timestamp <= time.time() - 5: if timestamp <= time.time() - 5:
current_data = None current_data = None
# Retry every second until we get a result
while bool(current_data) is False: while bool(current_data) is False:
current_data = collect_data(ser, debug) current_data = collect_data(ser, debug)
time.sleep(1) time.sleep(1)
current_data['timestamp'] = time.time() current_data["timestamp"] = time.time()
current_data['client'] = client current_data["client"] = client
send_response(connection, current_data, debug) send_response(connection, current_data, debug)
case _: case _:
print('socket: invalid request from {}'.format(request_data['client'])) print(
"socket: invalid request from {}".format(request_data["client"])
)
break break
except KeyboardInterrupt: except KeyboardInterrupt:
@@ -729,9 +299,9 @@ def main():
sys.exit(1) sys.exit(1)
finally: finally:
# Clean up the connection
if connection: if connection:
connection.close() connection.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()