Files
bmspy/tests/test_influxdb.py
2026-05-02 23:12:29 +02:00

374 lines
15 KiB
Python

import pytest
pytest.importorskip("influxdb_client_3", reason="influxdb3-python not installed")
import pytest
from unittest.mock import patch, MagicMock
from bmspy.classes import UPS
from bmspy.influxdb import influxdb_create_snapshot
def _ups(*field_dicts: dict) -> dict[str, UPS]:
"""Helper: build a {ups_name: UPS} map from a list of field dicts."""
fields = {}
for d in field_dicts:
fields[d.pop("_name")] = d
return {"testups": UPS.from_dict(fields)}
class TestInfluxdbCreateSnapshot:
def test_scalar_field_produces_one_point(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 1
def test_multi_field_produces_one_point_per_index(self):
ups_data = {"myups": UPS.from_dict({
"bms_cells": {
"help": "Cell Voltages",
"label": "cell",
"raw_values": {1: 3.6, 2: 3.61, 3: 3.59},
"values": {1: "3.600", 2: "3.610", 3: "3.590"},
"units": "V",
},
})}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 3
def test_info_field_produces_one_point(self):
ups_data = {"myups": UPS.from_dict({
"bms_date": {"help": "Manufacture Date", "info": "2023-01-15"},
})}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 1
def test_mixed_fields_sum_correctly(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"},
"bms_date": {"help": "Date", "info": "2023-01-15"},
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6, 2: 3.61},
"values": {1: "3.600", 2: "3.610"},
"units": "V",
},
})}
# 1 scalar + 1 info + 2 multi = 4 points
points = influxdb_create_snapshot(ups_data)
assert len(points) == 4
def test_multiple_ups_devices(self):
field = {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"}
ups_data = {
"ups1": UPS.from_dict({"bms_voltage": dict(field)}),
"ups2": UPS.from_dict({"bms_voltage": dict(field)}),
}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 2
def test_empty_ups_produces_no_points(self):
ups_data = {"myups": UPS.from_dict({})}
points = influxdb_create_snapshot(ups_data)
assert points == []
def test_point_measurement_name(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage_total_volts": {
"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"
},
})}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 1
# Point measurement name should match the field key
assert points[0]._name == "bms_voltage_total_volts"
def test_debug_mode_does_not_raise(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
# debug=3 triggers the debugger() calls — should not raise
influxdb_create_snapshot(ups_data, debug=3)
def test_scalar_point_has_ups_tag(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
points = influxdb_create_snapshot(ups_data)
assert points[0]._tags.get("ups") == "myups"
def test_multi_field_points_have_label_tag(self):
ups_data = {"myups": UPS.from_dict({
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6, 2: 3.61},
"values": {1: "3.600", 2: "3.610"},
"units": "V",
},
})}
points = influxdb_create_snapshot(ups_data)
assert all("cell" in p._tags for p in points)
def test_info_field_point_value(self):
ups_data = {"myups": UPS.from_dict({
"bms_date": {"help": "Manufacture Date", "info": "2023-01-15"},
})}
points = influxdb_create_snapshot(ups_data)
assert points[0]._fields.get("value") == "2023-01-15"
# ---------------------------------------------------------------------------
# influxdb_write_snapshot
# ---------------------------------------------------------------------------
class TestInfluxdbWriteSnapshot:
def test_write_called_once(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_write_snapshot(mock_client, "test_bucket", ups_data)
mock_client.write.assert_called_once()
def test_write_uses_correct_database(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_write_snapshot(mock_client, "mybucket", ups_data)
call_kwargs = mock_client.write.call_args[1]
assert call_kwargs.get("database") == "mybucket"
def test_write_exception_does_not_raise(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
mock_client.write.side_effect = Exception("connection failed")
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_write_snapshot(mock_client, "mybucket", ups_data)
def test_empty_ups_writes_no_points(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
influxdb_write_snapshot(mock_client, "mybucket", {"myups": UPS.from_dict({})})
call_kwargs = mock_client.write.call_args[1]
assert call_kwargs.get("record") == []
# ---------------------------------------------------------------------------
# influxdb_export (non-daemonized)
# ---------------------------------------------------------------------------
class TestInfluxdbExport:
def test_single_write_when_not_daemonized(self):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import influxdb_export
mock_instance = MagicMock()
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \
patch("bmspy.influxdb.client.read_data", return_value=ups_data):
influxdb_export(
bucket="test",
url="http://localhost",
org="org",
token="token",
daemonize=False,
)
mock_instance.write.assert_called_once()
def test_client_closed_after_non_daemonized_run(self):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import influxdb_export
mock_instance = MagicMock()
ups_data = {"myups": UPS.from_dict({})}
with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \
patch("bmspy.influxdb.client.read_data", return_value=ups_data):
influxdb_export(
bucket="test",
url="http://localhost",
org="org",
token="token",
daemonize=False,
)
mock_instance.close.assert_called()
def test_env_vars_used_when_no_url(self, monkeypatch):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import influxdb_export
monkeypatch.setenv("INFLUXDB_V2_URL", "http://envhost")
monkeypatch.setenv("INFLUXDB_V2_ORG", "envorg")
monkeypatch.setenv("INFLUXDB_V2_TOKEN", "envtoken")
mock_instance = MagicMock()
ups_data = {"myups": UPS.from_dict({})}
with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance) as mock_cls, \
patch("bmspy.influxdb.client.read_data", return_value=ups_data):
influxdb_export(bucket="test", daemonize=False)
# Should have been called with env URL
call_kwargs = mock_cls.call_args[1]
assert call_kwargs.get("host") == "http://envhost"
def test_daemonize_true_loops_until_exception(self):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import influxdb_export
mock_instance = MagicMock()
ups_data = {"myups": UPS.from_dict({})}
call_count = 0
def _read_data(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count >= 2:
raise StopIteration("stop")
return ups_data
with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \
patch("bmspy.influxdb.client.read_data", side_effect=_read_data), \
patch("bmspy.influxdb.time.sleep"):
with pytest.raises(StopIteration):
influxdb_export(
bucket="test",
url="http://localhost",
org="org",
token="token",
daemonize=True,
)
assert call_count >= 2
# ---------------------------------------------------------------------------
# influx_shutdown
# ---------------------------------------------------------------------------
class TestInfluxShutdown:
def test_none_is_no_op(self):
from bmspy.influxdb import influx_shutdown
# Should not raise
influx_shutdown(None)
def test_calls_close_on_client(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influx_shutdown
mock_client = MagicMock()
influx_shutdown(mock_client)
mock_client.close.assert_called_once()
# ---------------------------------------------------------------------------
# influxdb_write_snapshot debug coverage
# ---------------------------------------------------------------------------
class TestInfluxdbWriteSnapshotDebug:
def test_debug_2_logs_messages(self, capsys):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_write_snapshot(mock_client, "bucket", ups_data, debug=2)
captured = capsys.readouterr()
assert "snapshot" in captured.out.lower()
# ---------------------------------------------------------------------------
# influxdb_create_snapshot additional debug paths
# ---------------------------------------------------------------------------
class TestInfluxdbCreateSnapshotDebug:
def test_debug_3_scalar_logs(self, capsys):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_create_snapshot(ups_data, debug=3)
captured = capsys.readouterr()
assert "value" in captured.out.lower()
def test_debug_3_multi_logs(self, capsys):
ups_data = {"myups": UPS.from_dict({
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6},
"values": {1: "3.600"},
"units": "V",
},
})}
influxdb_create_snapshot(ups_data, debug=3)
captured = capsys.readouterr()
assert "labels" in captured.out.lower()
def test_debug_3_info_logs(self, capsys):
ups_data = {"myups": UPS.from_dict({
"bms_date": {"help": "Date", "info": "2023-01-15"},
})}
influxdb_create_snapshot(ups_data, debug=3)
captured = capsys.readouterr()
assert "info" in captured.out.lower()
# ---------------------------------------------------------------------------
# influxdb main()
# ---------------------------------------------------------------------------
class TestInfluxdbMain:
def test_main_missing_url_exits(self, monkeypatch):
from bmspy.influxdb import main
monkeypatch.delenv("INFLUXDB_V2_URL", raising=False)
monkeypatch.delenv("INFLUXDB_V2_ORG", raising=False)
monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False)
with patch("sys.argv", ["bmspy-influxdb"]):
with pytest.raises(SystemExit):
main()
def test_main_missing_org_exits(self, monkeypatch):
from bmspy.influxdb import main
monkeypatch.setenv("INFLUXDB_V2_URL", "http://host")
monkeypatch.delenv("INFLUXDB_V2_ORG", raising=False)
monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False)
with patch("sys.argv", ["bmspy-influxdb"]):
with pytest.raises(SystemExit):
main()
def test_main_missing_token_exits(self, monkeypatch):
from bmspy.influxdb import main
monkeypatch.setenv("INFLUXDB_V2_URL", "http://host")
monkeypatch.setenv("INFLUXDB_V2_ORG", "myorg")
monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False)
with patch("sys.argv", ["bmspy-influxdb"]):
with pytest.raises(SystemExit):
main()
def test_main_calls_influxdb_export(self, monkeypatch):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import main
mock_export = MagicMock()
with patch("sys.argv", ["bmspy-influxdb", "--url", "http://host", "--org", "org", "--token", "tok"]), \
patch("bmspy.influxdb.client.handle_registration"), \
patch("bmspy.influxdb.influxdb_export", mock_export):
main()
mock_export.assert_called_once()
def test_main_with_env_vars_calls_export(self, monkeypatch):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import main
monkeypatch.setenv("INFLUXDB_V2_URL", "http://envhost")
monkeypatch.setenv("INFLUXDB_V2_ORG", "envorg")
monkeypatch.setenv("INFLUXDB_V2_TOKEN", "envtoken")
mock_export = MagicMock()
with patch("sys.argv", ["bmspy-influxdb"]), \
patch("bmspy.influxdb.client.handle_registration"), \
patch("bmspy.influxdb.influxdb_export", mock_export):
main()
mock_export.assert_called_once()