import pytest pytest.importorskip("influxdb_client_3", reason="influxdb3-python not installed") import pytest from unittest.mock import patch, MagicMock from bmspy.classes import UPS from bmspy.influxdb import influxdb_create_snapshot def _ups(*field_dicts: dict) -> dict[str, UPS]: """Helper: build a {ups_name: UPS} map from a list of field dicts.""" fields = {} for d in field_dicts: fields[d.pop("_name")] = d return {"testups": UPS.from_dict(fields)} class TestInfluxdbCreateSnapshot: def test_scalar_field_produces_one_point(self): ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} points = influxdb_create_snapshot(ups_data) assert len(points) == 1 def test_multi_field_produces_one_point_per_index(self): ups_data = {"myups": UPS.from_dict({ "bms_cells": { "help": "Cell Voltages", "label": "cell", "raw_values": {1: 3.6, 2: 3.61, 3: 3.59}, "values": {1: "3.600", 2: "3.610", 3: "3.590"}, "units": "V", }, })} points = influxdb_create_snapshot(ups_data) assert len(points) == 3 def test_info_field_produces_one_point(self): ups_data = {"myups": UPS.from_dict({ "bms_date": {"help": "Manufacture Date", "info": "2023-01-15"}, })} points = influxdb_create_snapshot(ups_data) assert len(points) == 1 def test_mixed_fields_sum_correctly(self): ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"}, "bms_date": {"help": "Date", "info": "2023-01-15"}, "bms_cells": { "help": "Cells", "label": "cell", "raw_values": {1: 3.6, 2: 3.61}, "values": {1: "3.600", 2: "3.610"}, "units": "V", }, })} # 1 scalar + 1 info + 2 multi = 4 points points = influxdb_create_snapshot(ups_data) assert len(points) == 4 def test_multiple_ups_devices(self): field = {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"} ups_data = { "ups1": UPS.from_dict({"bms_voltage": dict(field)}), "ups2": UPS.from_dict({"bms_voltage": dict(field)}), } points = influxdb_create_snapshot(ups_data) assert len(points) == 2 def test_empty_ups_produces_no_points(self): ups_data = {"myups": UPS.from_dict({})} points = influxdb_create_snapshot(ups_data) assert points == [] def test_point_measurement_name(self): ups_data = {"myups": UPS.from_dict({ "bms_voltage_total_volts": { "help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V" }, })} points = influxdb_create_snapshot(ups_data) assert len(points) == 1 # Point measurement name should match the field key assert points[0]._name == "bms_voltage_total_volts" def test_debug_mode_does_not_raise(self): ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} # debug=3 triggers the debugger() calls — should not raise influxdb_create_snapshot(ups_data, debug=3) def test_scalar_point_has_ups_tag(self): ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} points = influxdb_create_snapshot(ups_data) assert points[0]._tags.get("ups") == "myups" def test_multi_field_points_have_label_tag(self): ups_data = {"myups": UPS.from_dict({ "bms_cells": { "help": "Cells", "label": "cell", "raw_values": {1: 3.6, 2: 3.61}, "values": {1: "3.600", 2: "3.610"}, "units": "V", }, })} points = influxdb_create_snapshot(ups_data) assert all("cell" in p._tags for p in points) def test_info_field_point_value(self): ups_data = {"myups": UPS.from_dict({ "bms_date": {"help": "Manufacture Date", "info": "2023-01-15"}, })} points = influxdb_create_snapshot(ups_data) assert points[0]._fields.get("value") == "2023-01-15" # --------------------------------------------------------------------------- # influxdb_write_snapshot # --------------------------------------------------------------------------- class TestInfluxdbWriteSnapshot: def test_write_called_once(self): from unittest.mock import MagicMock from bmspy.influxdb import influxdb_write_snapshot mock_client = MagicMock() ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} influxdb_write_snapshot(mock_client, "test_bucket", ups_data) mock_client.write.assert_called_once() def test_write_uses_correct_database(self): from unittest.mock import MagicMock from bmspy.influxdb import influxdb_write_snapshot mock_client = MagicMock() ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} influxdb_write_snapshot(mock_client, "mybucket", ups_data) call_kwargs = mock_client.write.call_args[1] assert call_kwargs.get("database") == "mybucket" def test_write_exception_does_not_raise(self): from unittest.mock import MagicMock from bmspy.influxdb import influxdb_write_snapshot mock_client = MagicMock() mock_client.write.side_effect = Exception("connection failed") ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} influxdb_write_snapshot(mock_client, "mybucket", ups_data) def test_empty_ups_writes_no_points(self): from unittest.mock import MagicMock from bmspy.influxdb import influxdb_write_snapshot mock_client = MagicMock() influxdb_write_snapshot(mock_client, "mybucket", {"myups": UPS.from_dict({})}) call_kwargs = mock_client.write.call_args[1] assert call_kwargs.get("record") == [] # --------------------------------------------------------------------------- # influxdb_export (non-daemonized) # --------------------------------------------------------------------------- class TestInfluxdbExport: def test_single_write_when_not_daemonized(self): from unittest.mock import MagicMock, patch from bmspy.influxdb import influxdb_export mock_instance = MagicMock() ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \ patch("bmspy.influxdb.client.read_data", return_value=ups_data): influxdb_export( bucket="test", url="http://localhost", org="org", token="token", daemonize=False, ) mock_instance.write.assert_called_once() def test_client_closed_after_non_daemonized_run(self): from unittest.mock import MagicMock, patch from bmspy.influxdb import influxdb_export mock_instance = MagicMock() ups_data = {"myups": UPS.from_dict({})} with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \ patch("bmspy.influxdb.client.read_data", return_value=ups_data): influxdb_export( bucket="test", url="http://localhost", org="org", token="token", daemonize=False, ) mock_instance.close.assert_called() def test_env_vars_used_when_no_url(self, monkeypatch): from unittest.mock import MagicMock, patch from bmspy.influxdb import influxdb_export monkeypatch.setenv("INFLUXDB_V2_URL", "http://envhost") monkeypatch.setenv("INFLUXDB_V2_ORG", "envorg") monkeypatch.setenv("INFLUXDB_V2_TOKEN", "envtoken") mock_instance = MagicMock() ups_data = {"myups": UPS.from_dict({})} with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance) as mock_cls, \ patch("bmspy.influxdb.client.read_data", return_value=ups_data): influxdb_export(bucket="test", daemonize=False) # Should have been called with env URL call_kwargs = mock_cls.call_args[1] assert call_kwargs.get("host") == "http://envhost" def test_daemonize_true_loops_until_exception(self): from unittest.mock import MagicMock, patch from bmspy.influxdb import influxdb_export mock_instance = MagicMock() ups_data = {"myups": UPS.from_dict({})} call_count = 0 def _read_data(*args, **kwargs): nonlocal call_count call_count += 1 if call_count >= 2: raise StopIteration("stop") return ups_data with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \ patch("bmspy.influxdb.client.read_data", side_effect=_read_data), \ patch("bmspy.influxdb.time.sleep"): with pytest.raises(StopIteration): influxdb_export( bucket="test", url="http://localhost", org="org", token="token", daemonize=True, ) assert call_count >= 2 # --------------------------------------------------------------------------- # influx_shutdown # --------------------------------------------------------------------------- class TestInfluxShutdown: def test_none_is_no_op(self): from bmspy.influxdb import influx_shutdown # Should not raise influx_shutdown(None) def test_calls_close_on_client(self): from unittest.mock import MagicMock from bmspy.influxdb import influx_shutdown mock_client = MagicMock() influx_shutdown(mock_client) mock_client.close.assert_called_once() # --------------------------------------------------------------------------- # influxdb_write_snapshot debug coverage # --------------------------------------------------------------------------- class TestInfluxdbWriteSnapshotDebug: def test_debug_2_logs_messages(self, capsys): from unittest.mock import MagicMock from bmspy.influxdb import influxdb_write_snapshot mock_client = MagicMock() ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} influxdb_write_snapshot(mock_client, "bucket", ups_data, debug=2) captured = capsys.readouterr() assert "snapshot" in captured.out.lower() # --------------------------------------------------------------------------- # influxdb_create_snapshot additional debug paths # --------------------------------------------------------------------------- class TestInfluxdbCreateSnapshotDebug: def test_debug_3_scalar_logs(self, capsys): ups_data = {"myups": UPS.from_dict({ "bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}, })} influxdb_create_snapshot(ups_data, debug=3) captured = capsys.readouterr() assert "value" in captured.out.lower() def test_debug_3_multi_logs(self, capsys): ups_data = {"myups": UPS.from_dict({ "bms_cells": { "help": "Cells", "label": "cell", "raw_values": {1: 3.6}, "values": {1: "3.600"}, "units": "V", }, })} influxdb_create_snapshot(ups_data, debug=3) captured = capsys.readouterr() assert "labels" in captured.out.lower() def test_debug_3_info_logs(self, capsys): ups_data = {"myups": UPS.from_dict({ "bms_date": {"help": "Date", "info": "2023-01-15"}, })} influxdb_create_snapshot(ups_data, debug=3) captured = capsys.readouterr() assert "info" in captured.out.lower() # --------------------------------------------------------------------------- # influxdb main() # --------------------------------------------------------------------------- class TestInfluxdbMain: def test_main_missing_url_exits(self, monkeypatch): from bmspy.influxdb import main monkeypatch.delenv("INFLUXDB_V2_URL", raising=False) monkeypatch.delenv("INFLUXDB_V2_ORG", raising=False) monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False) with patch("sys.argv", ["bmspy-influxdb"]): with pytest.raises(SystemExit): main() def test_main_missing_org_exits(self, monkeypatch): from bmspy.influxdb import main monkeypatch.setenv("INFLUXDB_V2_URL", "http://host") monkeypatch.delenv("INFLUXDB_V2_ORG", raising=False) monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False) with patch("sys.argv", ["bmspy-influxdb"]): with pytest.raises(SystemExit): main() def test_main_missing_token_exits(self, monkeypatch): from bmspy.influxdb import main monkeypatch.setenv("INFLUXDB_V2_URL", "http://host") monkeypatch.setenv("INFLUXDB_V2_ORG", "myorg") monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False) with patch("sys.argv", ["bmspy-influxdb"]): with pytest.raises(SystemExit): main() def test_main_calls_influxdb_export(self, monkeypatch): from unittest.mock import MagicMock, patch from bmspy.influxdb import main mock_export = MagicMock() with patch("sys.argv", ["bmspy-influxdb", "--url", "http://host", "--org", "org", "--token", "tok"]), \ patch("bmspy.influxdb.client.handle_registration"), \ patch("bmspy.influxdb.influxdb_export", mock_export): main() mock_export.assert_called_once() def test_main_with_env_vars_calls_export(self, monkeypatch): from unittest.mock import MagicMock, patch from bmspy.influxdb import main monkeypatch.setenv("INFLUXDB_V2_URL", "http://envhost") monkeypatch.setenv("INFLUXDB_V2_ORG", "envorg") monkeypatch.setenv("INFLUXDB_V2_TOKEN", "envtoken") mock_export = MagicMock() with patch("sys.argv", ["bmspy-influxdb"]), \ patch("bmspy.influxdb.client.handle_registration"), \ patch("bmspy.influxdb.influxdb_export", mock_export): main() mock_export.assert_called_once()