Files
bmspy/tests/test_jbd_bms.py
2026-05-02 23:12:29 +02:00

705 lines
28 KiB
Python

import pytest
from unittest.mock import MagicMock, patch
from bmspy.jbd_bms import (
JBDBMS,
bytes_to_digits,
bytes_to_date,
convert_to_signed,
verify_checksum,
parse_03_response,
parse_04_response,
requestMessage,
serial_cleanup,
collect_data,
)
from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField, UPS
# ---------------------------------------------------------------------------
# bytes_to_digits
# ---------------------------------------------------------------------------
class TestBytesToDigits:
def test_zero(self):
assert bytes_to_digits(0x00, 0x00) == 0
def test_low_byte_only(self):
assert bytes_to_digits(0x00, 0x0A) == 10
def test_high_byte_only(self):
assert bytes_to_digits(0x01, 0x00) == 256
def test_combined(self):
assert bytes_to_digits(0x14, 0x50) == 5200
def test_max(self):
assert bytes_to_digits(0xFF, 0xFF) == 65535
# ---------------------------------------------------------------------------
# bytes_to_date
# ---------------------------------------------------------------------------
class TestBytesToDate:
def test_known_date(self):
# 0x2E2F = 11823; day=15, mon=1, year=2023
assert bytes_to_date(0x2E, 0x2F) == "2023-01-15"
def test_zero_encodes_epoch(self):
# day=0, mon=0, year=2000
assert bytes_to_date(0x00, 0x00) == "2000-00-00"
def test_day_field(self):
# Only day bits set: 0x001F → day=31, mon=0, year=2000
assert bytes_to_date(0x00, 0x1F) == "2000-00-31"
def test_month_field(self):
# month=12: bits [8:5] = 0b1100 = 12 → raw = 12 << 5 = 384 = 0x0180
assert bytes_to_date(0x01, 0x80) == "2000-12-00"
def test_year_field(self):
# year offset = 24 → value = 24 << 9 = 12288 = 0x3000
assert bytes_to_date(0x30, 0x00) == "2024-00-00"
# ---------------------------------------------------------------------------
# convert_to_signed
# ---------------------------------------------------------------------------
class TestConvertToSigned:
def test_zero(self):
assert convert_to_signed(0) == 0
def test_small_positive(self):
assert convert_to_signed(100) == 100
def test_below_threshold(self):
assert convert_to_signed(1023) == 1023
def test_at_threshold_maps_to_zero(self):
# 1024 → (1024-512) % 1024 - 512 = 0
assert convert_to_signed(1024) == 0
def test_just_above_threshold_maps_to_positive(self):
assert convert_to_signed(1025) == 1
def test_maps_to_negative(self):
# 2047 → (2047-512)%1024 - 512 = 1535%1024 - 512 = 511-512 = -1
assert convert_to_signed(2047) == -1
def test_maps_to_most_negative(self):
# 1536 → (1536-512)%1024 - 512 = 1024%1024 - 512 = -512
assert convert_to_signed(1536) == -512
# ---------------------------------------------------------------------------
# verify_checksum
# ---------------------------------------------------------------------------
class TestVerifyChecksum:
def _make_checksum(self, data: bytes) -> bytes:
s = sum(data)
s = (s ^ 0xFFFF) + 1
return bytes([s >> 8, s & 0xFF])
def test_correct_checksum(self):
data = bytes([0x1B, 0x14, 0x50, 0x00])
chk = self._make_checksum(data)
assert verify_checksum(data, chk) is True
def test_single_byte(self):
data = bytes([0x42])
chk = self._make_checksum(data)
assert verify_checksum(data, chk) is True
def test_wrong_checksum(self):
data = bytes([0x10, 0x20])
assert verify_checksum(data, bytes([0x00, 0x00])) is False
def test_off_by_one(self):
data = bytes([0x10, 0x20])
chk = self._make_checksum(data)
bad = bytes([chk[0], chk[1] ^ 0x01])
assert verify_checksum(data, bad) is False
def test_empty_data(self):
# sum=0 → s = (0^0xFFFF)+1 = 65536, which can never equal a 2-byte chk
assert verify_checksum(bytes(), bytes([0xFF, 0xFF])) is False
# ---------------------------------------------------------------------------
# JBDBMS
# ---------------------------------------------------------------------------
class TestJBDBMS:
def test_empty_is_falsy(self):
assert not JBDBMS()
def test_populated_is_truthy(self, populated_jbdbms):
assert bool(populated_jbdbms)
def test_items_skips_none_fields(self):
bms = JBDBMS()
bms.bms_voltage_total_volts = BMSScalarField(
help="Total Voltage", raw_value=52.0, value="52.00", units="V"
)
keys = [k for k, _ in bms.items()]
assert keys == ["bms_voltage_total_volts"]
def test_items_yields_all_populated_fields(self, populated_jbdbms):
keys = {k for k, _ in populated_jbdbms.items()}
assert "bms_voltage_total_volts" in keys
assert "bms_current_amps" in keys
assert "bms_manufacture_date" in keys
assert "bms_temperature_celcius" in keys
def test_items_yields_correct_types(self, populated_jbdbms):
d = dict(populated_jbdbms.items())
assert isinstance(d["bms_voltage_total_volts"], BMSScalarField)
assert isinstance(d["bms_manufacture_date"], BMSInfoField)
assert isinstance(d["bms_temperature_celcius"], BMSMultiField)
def test_is_ups_subclass(self):
assert isinstance(JBDBMS(), UPS)
# ---------------------------------------------------------------------------
# parse_03_response
# ---------------------------------------------------------------------------
class TestParse03Response:
def test_valid_response_returns_jbdbms(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert isinstance(result, JBDBMS)
def test_voltage(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_voltage_total_volts.raw_value == pytest.approx(52.00)
assert result.bms_voltage_total_volts.units == "V"
def test_current(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_current_amps.raw_value == pytest.approx(0.0)
def test_remaining_capacity(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_capacity_remaining_ah.raw_value == pytest.approx(100.00)
def test_nominal_capacity(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_capacity_nominal_ah.raw_value == pytest.approx(100.00)
def test_charge_cycles(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_charge_cycles.raw_value == 10
def test_manufacture_date(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_manufacture_date.info == "2023-01-15"
def test_rsoc(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_capacity_charge_ratio.raw_value == pytest.approx(0.95)
def test_cell_count(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_cell_number.raw_value == 4
def test_temperature(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_temperature_celcius.raw_values[1] == pytest.approx(25.0)
assert result.bms_temperature_celcius.units == "°C"
def test_mosfet_charging(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_charge_is_charging.raw_value is True
def test_mosfet_discharging(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_charge_is_discharging.raw_value is True
def test_no_protection_faults(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_protection_sop_bool.raw_value is False
assert result.bms_protection_cocp_bool.raw_value is False
def test_wrong_start_byte_returns_false(self, valid_03_response):
valid_03_response[0] = 0xAA
assert parse_03_response(valid_03_response) is False
def test_error_status_byte_returns_false(self, valid_03_response):
valid_03_response[2] = 0x80
assert parse_03_response(valid_03_response) is False
def test_bad_checksum_returns_false(self, valid_03_response):
valid_03_response[-1] ^= 0xFF # corrupt last checksum byte
assert parse_03_response(valid_03_response) is False
def test_truncated_response_returns_false(self, valid_03_response):
assert parse_03_response(valid_03_response[:10]) is False
def test_zero_data_len_returns_false(self, valid_03_response):
valid_03_response[3] = 0x00
assert parse_03_response(valid_03_response) is False
# ---------------------------------------------------------------------------
# parse_04_response
# ---------------------------------------------------------------------------
class TestParse04Response:
def test_valid_response_returns_multi_field(self, valid_04_response):
result = parse_04_response(valid_04_response)
assert isinstance(result, BMSMultiField)
def test_cell_count(self, valid_04_response):
result = parse_04_response(valid_04_response)
assert len(result.raw_values) == 4
def test_cell_voltages(self, valid_04_response):
result = parse_04_response(valid_04_response)
assert result.raw_values[1] == pytest.approx(3.600)
assert result.raw_values[2] == pytest.approx(3.601)
assert result.raw_values[3] == pytest.approx(3.599)
assert result.raw_values[4] == pytest.approx(3.598)
def test_cell_voltage_units(self, valid_04_response):
result = parse_04_response(valid_04_response)
assert result.units == "V"
assert result.label == "cell"
def test_wrong_start_byte_returns_false(self, valid_04_response):
valid_04_response[0] = 0xAA
assert parse_04_response(valid_04_response) is False
def test_error_status_byte_returns_false(self, valid_04_response):
valid_04_response[2] = 0x80
assert parse_04_response(valid_04_response) is False
def test_bad_checksum_returns_false(self, valid_04_response):
valid_04_response[-1] ^= 0xFF
assert parse_04_response(valid_04_response) is False
def test_truncated_response_returns_false(self, valid_04_response):
assert parse_04_response(valid_04_response[:5]) is False
def test_zero_data_len_returns_false(self, valid_04_response):
valid_04_response[3] = 0x00
assert parse_04_response(valid_04_response) is False
# ---------------------------------------------------------------------------
# parse_03_response — protection bits and other field variations
# ---------------------------------------------------------------------------
def _recompute_checksum(response: bytearray) -> None:
"""Recompute and update the JBD frame checksum in-place."""
data_len = response[3]
first = data_len + 4
s = sum(response[3:first])
s = (s ^ 0xFFFF) + 1
response[first] = (s >> 8) & 0xFF
response[first + 1] = s & 0xFF
class TestParse03ProtectionBits:
def test_sop_bit_set(self, valid_03_response):
valid_03_response[20] = 0x00
valid_03_response[21] = 0x01 # bit 0 = SOP
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_protection_sop_bool.raw_value is True
assert result.bms_protection_sup_bool.raw_value is False
def test_cocp_bit_set(self, valid_03_response):
valid_03_response[20] = 0x01 # bit 8 = COCP (high byte bit 0)
valid_03_response[21] = 0x00
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_protection_cocp_bool.raw_value is True
def test_all_protections_clear(self, valid_03_response):
result = parse_03_response(valid_03_response)
for attr in [
"bms_protection_sop_bool", "bms_protection_sup_bool",
"bms_protection_wgop_bool", "bms_protection_wgup_bool",
"bms_protection_cotp_bool", "bms_protection_cutp_bool",
"bms_protection_dotp_bool", "bms_protection_dutp_bool",
"bms_protection_cocp_bool", "bms_protection_docp_bool",
"bms_protection_scp_bool", "bms_protection_fdic_bool",
"bms_protection_slmos_bool",
]:
assert getattr(result, attr).raw_value is False, f"{attr} should be False"
def test_negative_current(self, valid_03_response):
# 1536 = 0x0600; convert_to_signed(1536) = -512; -512 * 0.01 = -5.12 A
valid_03_response[6] = 0x06
valid_03_response[7] = 0x00
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_current_amps.raw_value == pytest.approx(-5.12)
def test_cell_1_balancing(self, valid_03_response):
# balance_state_low = bytes_to_digits(response[18], response[19])
# bit 0 of balance_state_low → cell 1 balancing
valid_03_response[18] = 0x00
valid_03_response[19] = 0x01
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_cells_balancing.raw_values[1] is True
assert result.bms_cells_balancing.raw_values[2] is False
def test_mosfet_only_charging(self, valid_03_response):
# control_status = 0x01 → charging only
valid_03_response[24] = 0x01
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_charge_is_charging.raw_value is True
assert result.bms_charge_is_discharging.raw_value is False
def test_mosfet_only_discharging(self, valid_03_response):
# control_status = 0x02 → discharging only
valid_03_response[24] = 0x02
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_charge_is_charging.raw_value is False
assert result.bms_charge_is_discharging.raw_value is True
def test_two_temperature_sensors(self):
from tests.conftest import VALID_03_RESPONSE
# Build a modified 03 response with 2 NTC sensors
# data_len changes from 25 to 27 (add 2 bytes for NTC 2)
response = bytearray(VALID_03_RESPONSE[:29]) # bytes 0-28
response[3] = 0x1B # data_len = 27
response[26] = 0x02 # NTC count = 2
response += bytearray([0x0B, 0x6B]) # NTC 2: (2923-2731)*0.1 = 19.2°C
response += bytearray([0x00, 0x00]) # placeholder checksum
_recompute_checksum(response)
result = parse_03_response(response)
assert isinstance(result, JBDBMS)
assert len(result.bms_temperature_celcius.raw_values) == 2
assert result.bms_temperature_celcius.raw_values[1] == pytest.approx(25.0)
assert result.bms_temperature_celcius.raw_values[2] == pytest.approx(19.2)
# ---------------------------------------------------------------------------
# serial_cleanup
# ---------------------------------------------------------------------------
class TestSerialCleanup:
def test_closes_open_port(self):
ser = MagicMock()
ser.is_open = True
serial_cleanup(ser)
ser.reset_input_buffer.assert_called()
ser.reset_output_buffer.assert_called()
ser.close.assert_called_once()
def test_does_not_close_if_not_open(self):
ser = MagicMock()
ser.is_open = False
serial_cleanup(ser)
ser.close.assert_not_called()
def test_debug_3_logs_message(self, capsys):
ser = MagicMock()
ser.is_open = True
serial_cleanup(ser, debug=3)
captured = capsys.readouterr()
assert "cleaning up" in captured.out
# ---------------------------------------------------------------------------
# requestMessage
# ---------------------------------------------------------------------------
class TestRequestMessage:
def _make_serial(self, response_bytes=b"\x77"):
ser = MagicMock()
ser.is_open = True
ser.in_waiting = 1
ser.write.return_value = 7
ser.read_until.return_value = response_bytes
return ser
def test_returns_response_bytes(self):
payload = b"\xDD\xA5\x00\x04\x01\x02\x03\x04\x77"
ser = self._make_serial(payload)
reqmsg = bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
result = requestMessage(ser, reqmsg)
assert result == payload
def test_open_failure_returns_false(self):
ser = MagicMock()
ser.is_open = True
ser.open.side_effect = Exception("port not found")
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result is False
def test_short_write_returns_false(self):
ser = self._make_serial()
ser.write.return_value = 3 # fewer bytes than message length
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result is False
def test_empty_response_returns_empty_string(self):
ser = self._make_serial(b"")
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result == ""
def test_exception_during_read_logs_error(self, capsys):
"""When read_until raises, requestMessage logs the exception."""
ser = MagicMock()
ser.is_open = True
ser.in_waiting = 1
ser.write.return_value = 7
ser.read_until.side_effect = Exception("serial port error")
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result is None
captured = capsys.readouterr()
assert "error communicating" in captured.out.lower()
def test_debug_3_logs_startup(self, capsys):
payload = b"\xDD\xA5\x00\x04\x01\x02\x03\x04\x77"
ser = self._make_serial(payload)
requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]), debug=3)
captured = capsys.readouterr()
assert "starting up monitor" in captured.out
def test_wait_timeout_returns_empty_string(self):
"""When in_waiting stays 0 long enough, returns empty string."""
ser = MagicMock()
ser.is_open = True
ser.in_waiting = 0
ser.write.return_value = 7
call_count = 0
def _in_waiting_prop():
nonlocal call_count
call_count += 1
return 0
# Simulate in_waiting always 0 → timeout after wait_time > 2
ser_mock = MagicMock()
ser_mock.is_open = True
ser_mock.write.return_value = 7
# Make in_waiting always return 0 (property mock)
type(ser_mock).in_waiting = property(lambda self: 0)
with patch("bmspy.jbd_bms.time.sleep"):
result = requestMessage(
ser_mock,
bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]),
debug=3,
)
assert result == ""
def test_cannot_open_port_returns_none(self):
"""When ser.is_open is False after open() call, returns None."""
ser = MagicMock()
ser.is_open = False
# open() doesn't raise but port remains closed
ser.open.return_value = None
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result is None
# ---------------------------------------------------------------------------
# collect_data
# ---------------------------------------------------------------------------
class TestCollectData:
def test_successful_collect_returns_jbdbms(self, valid_03_response, valid_04_response):
responses = [bytes(valid_03_response), bytes(valid_04_response)]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock())
assert isinstance(result, JBDBMS)
assert result.bms_voltage_cells_volts is not None
def test_empty_03_response_returns_false(self):
with patch("bmspy.jbd_bms.requestMessage", return_value=b""):
result = collect_data(MagicMock())
assert result is False
def test_empty_04_response_returns_false(self, valid_03_response):
responses = [bytes(valid_03_response), b""]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock())
assert result is False
def test_bad_03_checksum_returns_false(self, valid_03_response, valid_04_response):
valid_03_response[-1] ^= 0xFF # corrupt checksum
responses = [bytes(valid_03_response), bytes(valid_04_response)]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock())
assert result is False
def test_bad_04_checksum_returns_false(self, valid_03_response, valid_04_response):
valid_04_response[-1] ^= 0xFF # corrupt checksum
responses = [bytes(valid_03_response), bytes(valid_04_response)]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock())
assert result is False
def test_collect_data_debug_1(self, valid_03_response, valid_04_response, capsys):
responses = [bytes(valid_03_response), bytes(valid_04_response)]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock(), debug=1)
assert isinstance(result, JBDBMS)
# ---------------------------------------------------------------------------
# parse_03 and parse_04 debug coverage
# ---------------------------------------------------------------------------
class TestParse03Debug:
def test_debug_2_logs_voltage(self, valid_03_response, capsys):
parse_03_response(valid_03_response, debug=2)
captured = capsys.readouterr()
assert "voltage" in captured.out.lower() or "52" in captured.out
def test_debug_3_logs_data_length(self, valid_03_response, capsys):
parse_03_response(valid_03_response, debug=3)
captured = capsys.readouterr()
assert "data length" in captured.out.lower() or "25" in captured.out
def test_debug_3_logs_protection_state(self, valid_03_response, capsys):
parse_03_response(valid_03_response, debug=3)
captured = capsys.readouterr()
assert "protection state" in captured.out.lower() or "sop" in captured.out.lower()
class TestParse04Debug:
def test_debug_2_logs_cell_voltage(self, valid_04_response, capsys):
parse_04_response(valid_04_response, debug=2)
captured = capsys.readouterr()
assert "cell" in captured.out.lower() or "3.6" in captured.out
def test_debug_3_logs_data_length(self, valid_04_response, capsys):
parse_04_response(valid_04_response, debug=3)
captured = capsys.readouterr()
assert "data length" in captured.out.lower() or "8" in captured.out
# ---------------------------------------------------------------------------
# parse_03_response — cells >= 16 (branch coverage)
# ---------------------------------------------------------------------------
class TestCalculateChecksum:
def test_returns_empty_string(self):
from bmspy.jbd_bms import calculate_checksum
result = calculate_checksum(b"\x01\x02\x03")
assert result == ""
class TestParse03DataLenZero:
def test_data_len_zero_with_valid_checksum_returns_false(self):
"""Build a response where data_len=0 and checksum is valid."""
response = bytearray([
0xDD, 0xA5, 0x00, 0x00, # data_len = 0
0x00, 0x00, # checksum positions (first=4, second=5)
0x77, # end
])
_recompute_checksum(response)
result = parse_03_response(response)
assert result is False
class TestParse04DataLenZero:
def test_data_len_zero_with_valid_checksum_returns_false(self):
"""Build a parse_04 response where data_len=0 and checksum is valid."""
response = bytearray([
0xDD, 0xA5, 0x00, 0x00, # data_len = 0
0x00, 0x00, # checksum positions
0x77, # end
])
_recompute_checksum(response)
result = parse_04_response(response)
assert result is False
class TestParse03HighCellCount:
def test_17_cells_uses_high_balance_state(self):
"""Build a 17-cell response to cover the cell >= 16 branch."""
from tests.conftest import VALID_03_RESPONSE
# We need data_len = 25 + (17-4)*2 = 25 for 4 NTCs... actually we just
# need 17 cells. data_len stays 25 but we set cell count to 17.
response = bytearray(VALID_03_RESPONSE)
# Set cell count to 17
response[25] = 17
# Recompute checksum
_recompute_checksum(response)
result = parse_03_response(response)
assert isinstance(result, JBDBMS)
assert result.bms_cell_number.raw_value == 17
# Cell 17 should be in bms_cells_balancing
assert 17 in result.bms_cells_balancing.raw_values
# ---------------------------------------------------------------------------
# collect_data debug paths
# ---------------------------------------------------------------------------
class TestCollectDataDebug:
def test_debug_1_empty_03_logs(self, capsys):
with patch("bmspy.jbd_bms.requestMessage", return_value=b""):
collect_data(MagicMock(), debug=1)
captured = capsys.readouterr()
assert "error" in captured.out.lower()
def test_debug_1_empty_04_logs(self, valid_03_response, capsys):
responses = [bytes(valid_03_response), b""]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
collect_data(MagicMock(), debug=1)
captured = capsys.readouterr()
assert "error" in captured.out.lower()
# ---------------------------------------------------------------------------
# initialise_serial — covered with mocking
# ---------------------------------------------------------------------------
class TestInitialiseSerial:
def test_returns_serial_object(self):
from bmspy.jbd_bms import initialise_serial
with patch("bmspy.jbd_bms.serial.Serial") as mock_serial_cls:
mock_ser = MagicMock()
mock_serial_cls.return_value = mock_ser
result = initialise_serial("/dev/ttyUSB0", debug=0)
assert result is mock_ser
def test_sets_serial_params(self):
from bmspy.jbd_bms import initialise_serial
import serial as _serial
with patch("bmspy.jbd_bms.serial.Serial") as mock_serial_cls:
mock_ser = MagicMock()
mock_serial_cls.return_value = mock_ser
initialise_serial("/dev/ttyUSB0")
# Verify parity was set
assert mock_ser.parity == _serial.PARITY_NONE