Files
bmspy/tests/test_client.py
2026-05-02 23:12:29 +02:00

369 lines
14 KiB
Python

from unittest.mock import patch, MagicMock
import pytest
from bmspy.classes import BMSInfoField, BMSMultiField, BMSScalarField, UPS
from bmspy.client import read_data
MOCK_RESPONSE = {
"myups": {
"bms_voltage_total_volts": {
"help": "Total Voltage",
"raw_value": 52.0,
"value": "52.00",
"units": "V",
},
"bms_manufacture_date": {
"help": "Date of Manufacture",
"info": "2023-01-15",
},
"bms_temperature_celcius": {
"help": "Temperature",
"label": "sensor",
"raw_values": {1: 25.0},
"values": {1: "25.00"},
"units": "°C",
},
},
"officeups": {
"bms_voltage_total_volts": {
"help": "Total Voltage",
"raw_value": 48.5,
"value": "48.50",
"units": "V",
},
},
}
class TestReadData:
def _call(self, response=None):
with patch("bmspy.client.socket_comms", return_value=response or MOCK_RESPONSE):
return read_data("/fake/socket", "test")
def test_returns_dict_of_ups(self):
result = self._call()
assert isinstance(result, dict)
for v in result.values():
assert isinstance(v, UPS)
def test_all_devices_present(self):
result = self._call()
assert set(result.keys()) == {"myups", "officeups"}
def test_scalar_field_deserialized(self):
result = self._call()
items = dict(result["myups"].items())
f = items["bms_voltage_total_volts"]
assert isinstance(f, BMSScalarField)
assert f.raw_value == 52.0
assert f.units == "V"
def test_info_field_deserialized(self):
result = self._call()
items = dict(result["myups"].items())
f = items["bms_manufacture_date"]
assert isinstance(f, BMSInfoField)
assert f.info == "2023-01-15"
def test_multi_field_deserialized(self):
result = self._call()
items = dict(result["myups"].items())
f = items["bms_temperature_celcius"]
assert isinstance(f, BMSMultiField)
assert f.raw_values == {1: 25.0}
assert f.label == "sensor"
def test_ups_is_truthy_when_populated(self):
result = self._call()
assert bool(result["myups"]) is True
def test_empty_device_response(self):
result = self._call({"emptyups": {}})
assert isinstance(result["emptyups"], UPS)
assert bool(result["emptyups"]) is False
def test_ups_filter_forwarded(self):
with patch("bmspy.client.socket_comms") as mock_comms:
mock_comms.return_value = {"myups": MOCK_RESPONSE["myups"]}
read_data("/fake/socket", "test", ups="myups")
call_args = mock_comms.call_args[0][1]
assert call_args.get("ups") == "myups"
def test_no_ups_filter_not_in_request(self):
with patch("bmspy.client.socket_comms") as mock_comms:
mock_comms.return_value = {}
read_data("/fake/socket", "test")
call_args = mock_comms.call_args[0][1]
assert "ups" not in call_args
# ---------------------------------------------------------------------------
# handle_registration
# ---------------------------------------------------------------------------
import bmspy.client as _client_mod
@pytest.fixture(autouse=True)
def _reset_is_registered():
_client_mod.is_registered = False
yield
_client_mod.is_registered = False
class TestHandleRegistration:
def test_register_sets_flag(self):
with patch("bmspy.client.socket_comms", return_value={"status": "REGISTERED", "client": "test"}):
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
assert _client_mod.is_registered is True
def test_register_returns_response(self):
response = {"status": "REGISTERED", "client": "test"}
with patch("bmspy.client.socket_comms", return_value=response):
from bmspy.client import handle_registration
result = handle_registration("/fake/socket", "test")
assert result == response
def test_register_sends_register_command(self):
with patch("bmspy.client.socket_comms") as mock_comms:
mock_comms.return_value = {"status": "REGISTERED", "client": "test"}
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
sent = mock_comms.call_args[0][1]
assert sent["command"] == "REGISTER"
assert sent["client"] == "test"
def test_deregister_clears_flag(self):
_client_mod.is_registered = True
with patch("bmspy.client.socket_comms", return_value={"status": "DEREGISTERED", "client": "test"}):
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
assert _client_mod.is_registered is False
def test_deregister_sends_deregister_command(self):
_client_mod.is_registered = True
with patch("bmspy.client.socket_comms") as mock_comms:
mock_comms.return_value = {"status": "DEREGISTERED", "client": "test"}
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
sent = mock_comms.call_args[0][1]
assert sent["command"] == "DEREGISTER"
def test_socket_error_does_not_raise(self):
with patch("bmspy.client.socket_comms", side_effect=Exception("connection refused")):
from bmspy.client import handle_registration
result = handle_registration("/fake/socket", "test")
assert result == {}
def test_invalid_status_does_not_raise(self):
with patch("bmspy.client.socket_comms", return_value={"status": "UNKNOWN"}):
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
def test_debug_3_on_deregister_failure(self, capsys):
"""debug=3 path in exception handler when is_registered=True."""
_client_mod.is_registered = True
with patch("bmspy.client.socket_comms", side_effect=Exception("fail")):
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test", debug=3)
captured = capsys.readouterr()
assert "deregister" in captured.out.lower() or "fail" in captured.out
# ---------------------------------------------------------------------------
# socket_comms — real Unix socket
# ---------------------------------------------------------------------------
import json
import socket
import struct
import threading
class TestSocketComms:
"""Test socket_comms with a real Unix socket server running in a thread."""
def _run_server(self, sock_path: str, response: dict, ready_event: threading.Event):
"""Minimal server: accept one connection, read framed request, send framed response."""
srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
srv.bind(sock_path)
srv.listen(1)
ready_event.set()
conn, _ = srv.accept()
try:
# read length
raw_len = conn.recv(4)
if len(raw_len) < 4:
return # Client disconnected early
length = struct.unpack("!I", raw_len)[0]
data = conn.recv(length)
# send response
payload = json.dumps(response).encode()
conn.sendall(struct.pack("!I", len(payload)) + payload)
finally:
conn.close()
srv.close()
import os
try:
os.unlink(sock_path)
except FileNotFoundError:
pass
def test_returns_response_dict(self, tmp_path):
from bmspy.client import socket_comms
sock_path = str(tmp_path / "test.sock")
response = {"status": "REGISTERED", "client": "test"}
ready = threading.Event()
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
result = socket_comms(sock_path, {"command": "REGISTER", "client": "test"})
t.join(timeout=2)
assert result == response
def test_debug_3_does_not_raise(self, tmp_path, capsys):
from bmspy.client import socket_comms
sock_path = str(tmp_path / "test_dbg.sock")
response = {"status": "OK"}
ready = threading.Event()
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
result = socket_comms(sock_path, {"command": "GET", "client": "test"}, debug=3)
t.join(timeout=2)
assert result == response
def test_debug_4_does_not_raise(self, tmp_path, capsys):
from bmspy.client import socket_comms
sock_path = str(tmp_path / "test_dbg4.sock")
response = {"status": "OK"}
ready = threading.Event()
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
result = socket_comms(sock_path, {"command": "GET", "client": "test"}, debug=4)
t.join(timeout=2)
assert result == response
def test_debug_5_does_not_raise(self, tmp_path, capsys):
from bmspy.client import socket_comms
sock_path = str(tmp_path / "test_dbg5.sock")
response = {"status": "OK"}
ready = threading.Event()
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
result = socket_comms(sock_path, {"command": "GET", "client": "test"}, debug=5)
t.join(timeout=2)
assert result == response
def test_connection_refused_does_not_raise(self, tmp_path, capsys):
"""socket_comms gracefully handles ENOENT (no server)."""
from bmspy.client import socket_comms
import sys
sock_path = str(tmp_path / "nonexistent.sock")
# socket_comms calls sys.exit(1) on encode errors, but connection failures
# just print and continue (then fail on recv). We expect SystemExit.
with pytest.raises((SystemExit, OSError, Exception)):
socket_comms(sock_path, {"command": "GET", "client": "test"})
def _run_bad_response_server(self, sock_path: str, bad_payload: bytes, ready_event: threading.Event):
"""Server that sends a bad (non-JSON) response."""
srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
srv.bind(sock_path)
srv.listen(1)
ready_event.set()
conn, _ = srv.accept()
try:
# read request
raw_len = conn.recv(4)
length = struct.unpack("!I", raw_len)[0]
conn.recv(length)
# send bad response
conn.sendall(struct.pack("!I", len(bad_payload)) + bad_payload)
finally:
conn.close()
srv.close()
import os
try:
os.unlink(sock_path)
except FileNotFoundError:
pass
def test_invalid_json_response_exits(self, tmp_path, capsys):
"""socket_comms calls sys.exit(1) on invalid JSON response."""
from bmspy.client import socket_comms
sock_path = str(tmp_path / "bad_json.sock")
bad_payload = b"not json!!!"
ready = threading.Event()
t = threading.Thread(
target=self._run_bad_response_server,
args=(sock_path, bad_payload, ready),
daemon=True,
)
t.start()
ready.wait(timeout=2)
with pytest.raises(SystemExit):
socket_comms(sock_path, {"command": "GET", "client": "test"})
t.join(timeout=2)
def test_json_encode_failure_exits(self, tmp_path, capsys):
"""socket_comms calls sys.exit(1) when json.dumps raises."""
from bmspy.client import socket_comms
sock_path = str(tmp_path / "encode_err.sock")
# Create a minimal server so the socket connect succeeds
ready = threading.Event()
response = {"status": "OK"}
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
with patch("bmspy.client.json.dumps", side_effect=TypeError("not serializable")):
with pytest.raises(SystemExit):
socket_comms(sock_path, {"command": "GET", "client": "test"})
t.join(timeout=2)
def test_non_enoent_socket_error_logs_message(self, tmp_path, capsys):
"""socket_comms logs a different message for non-ENOENT socket errors."""
from bmspy.client import socket_comms
import socket as _socket
import errno
sock_path = str(tmp_path / "test_err.sock")
# Make connect raise a socket.error with errno != 2
err = _socket.error("connection refused")
err.errno = errno.ECONNREFUSED # not 2 (ENOENT)
with patch("bmspy.client.socket.socket") as mock_sock_cls:
mock_sock = MagicMock()
mock_sock_cls.return_value = mock_sock
mock_sock.connect.side_effect = err
# after connect fails, sendall will also fail, triggering sys.exit
with pytest.raises((SystemExit, Exception)):
socket_comms(sock_path, {"command": "GET", "client": "test"})
captured = capsys.readouterr()
assert "socket client" in captured.out or "connection refused" in captured.out.lower()
# ---------------------------------------------------------------------------
# read_data — socket_comms returns None path
# ---------------------------------------------------------------------------
class TestReadDataNone:
def test_raises_runtime_error_when_none(self):
with patch("bmspy.client.socket_comms", return_value=None):
with pytest.raises(RuntimeError, match="No data received"):
read_data("/fake/socket", "test")