import pytest import smtplib from unittest.mock import patch, MagicMock import bmspy.ups as ups_mod from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField, UPS from bmspy.ups import _get_field_value, _resolve_ups_device, handle_shutdown, handle_email # --------------------------------------------------------------------------- # Fixtures: reset module-level globals between tests # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def _reset_globals(): ups_mod.scheduled_shutdown = False ups_mod.critical_sent = False ups_mod.warning_sent = False ups_mod.alert_sent = False yield ups_mod.scheduled_shutdown = False ups_mod.critical_sent = False ups_mod.warning_sent = False ups_mod.alert_sent = False # --------------------------------------------------------------------------- # _get_field_value # --------------------------------------------------------------------------- class TestGetFieldValue: def _ups(self): return UPS.from_dict({ "bms_current_amps": { "help": "Current", "raw_value": -2.5, "value": "-2.50", "units": "A", }, "bms_capacity_charge_ratio": { "help": "Percent Charge", "raw_value": 0.75, "value": "0.75", "units": "%", }, "bms_date": { "help": "Date", "info": "2023-01-15", }, "bms_cells": { "help": "Cells", "label": "cell", "raw_values": {1: 3.6}, "values": {1: "3.600"}, "units": "V", }, }) def test_found_scalar_field(self): ups = self._ups() assert _get_field_value(ups, "bms_current_amps") == pytest.approx(-2.5) def test_found_charge_ratio(self): ups = self._ups() assert _get_field_value(ups, "bms_capacity_charge_ratio") == pytest.approx(0.75) def test_field_not_found_returns_none(self): ups = self._ups() assert _get_field_value(ups, "nonexistent_field") is None def test_info_field_returns_none(self): # BMSInfoField is not a scalar, should return None ups = self._ups() assert _get_field_value(ups, "bms_date") is None def test_multi_field_returns_none(self): # BMSMultiField is not a scalar, should return None ups = self._ups() assert _get_field_value(ups, "bms_cells") is None # --------------------------------------------------------------------------- # handle_shutdown # --------------------------------------------------------------------------- class TestHandleShutdown: def test_shutdown_calls_os_system(self): with patch("bmspy.ups.os.system") as mock_sys: handle_shutdown(action="shutdown", delay=5) mock_sys.assert_called_once_with("/sbin/shutdown 5") def test_shutdown_sets_scheduled_flag(self): with patch("bmspy.ups.os.system"): handle_shutdown(action="shutdown", delay=5) assert ups_mod.scheduled_shutdown is not False def test_shutdown_does_not_reschedule_if_already_scheduled(self): ups_mod.scheduled_shutdown = 9999999999.0 with patch("bmspy.ups.os.system") as mock_sys: handle_shutdown(action="shutdown", delay=5) mock_sys.assert_not_called() def test_cancel_calls_os_system(self): with patch("bmspy.ups.os.system") as mock_sys: handle_shutdown(action="cancel") mock_sys.assert_called_once_with("/sbin/shutdown -c") def test_cancel_does_not_set_flag(self): with patch("bmspy.ups.os.system"): handle_shutdown(action="cancel") assert ups_mod.scheduled_shutdown is False # --------------------------------------------------------------------------- # handle_email # --------------------------------------------------------------------------- class TestHandleEmail: def _mock_smtp(self): mock_server = MagicMock() mock_smtp_class = MagicMock() mock_smtp_class.return_value.__enter__ = MagicMock(return_value=mock_server) mock_smtp_class.return_value.__exit__ = MagicMock(return_value=False) return mock_smtp_class, mock_server def test_basic_send(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class): handle_email( text="test message", level="Alert", recipient="user@example.com", ) mock_server.sendmail.assert_called_once() def test_ssl_port_triggers_starttls(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \ patch("bmspy.ups.ssl.create_default_context") as mock_ssl: handle_email( text="test", level="Alert", recipient="user@example.com", port=465, ) mock_server.starttls.assert_called_once() def test_port_587_also_triggers_starttls(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \ patch("bmspy.ups.ssl.create_default_context"): handle_email( text="test", level="Alert", recipient="user@example.com", port=587, ) mock_server.starttls.assert_called_once() def test_no_ssl_no_starttls(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class): handle_email( text="test", level="Alert", recipient="user@example.com", port=25, ) mock_server.starttls.assert_not_called() def test_with_credentials_calls_login(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class): handle_email( text="test", level="Alert", recipient="user@example.com", mailuser="user", mailpass="pass", ) mock_server.login.assert_called_once_with("user", "pass") def test_without_credentials_no_login(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class): handle_email( text="test", level="Alert", recipient="user@example.com", ) mock_server.login.assert_not_called() def test_recipient_without_at_gets_hostname_appended(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \ patch("bmspy.ups.socket.gethostname", return_value="myhost"): handle_email( text="test", level="Alert", recipient="root", ) # sendmail recipient should be root@myhost args = mock_server.sendmail.call_args[0] assert args[1] == "root@myhost" # --------------------------------------------------------------------------- # _resolve_ups_device # --------------------------------------------------------------------------- def _simple_ups_data(*names: str) -> dict[str, UPS]: return {name: UPS.from_dict({ "bms_current_amps": {"help": "A", "raw_value": 0.0, "value": "0.00", "units": "A"}, }) for name in names} class TestResolveUpsDevice: def test_single_device_no_request_returns_it(self): data = _simple_ups_data("myups") assert _resolve_ups_device(data, None) == "myups" def test_requested_device_found(self): data = _simple_ups_data("ups1", "ups2") assert _resolve_ups_device(data, "ups2") == "ups2" def test_requested_device_not_found_returns_none(self, capsys): data = _simple_ups_data("ups1") result = _resolve_ups_device(data, "missing") assert result is None assert "missing" in capsys.readouterr().out def test_multiple_devices_no_request_returns_none(self, capsys): data = _simple_ups_data("ups1", "ups2") result = _resolve_ups_device(data, None) assert result is None out = capsys.readouterr().out assert "ups1" in out or "ups2" in out def test_empty_data_returns_none(self, capsys): result = _resolve_ups_device({}, None) assert result is None assert "no UPS" in capsys.readouterr().out # --------------------------------------------------------------------------- # ups main() - comprehensive loop testing # --------------------------------------------------------------------------- def _make_ups(current_amps: float, charge_ratio: float) -> UPS: """Build a UPS with bms_current_amps and bms_capacity_charge_ratio.""" return UPS.from_dict({ "bms_current_amps": { "help": "Current", "raw_value": current_amps, "value": str(current_amps), "units": "A", }, "bms_capacity_charge_ratio": { "help": "Charge Ratio", "raw_value": charge_ratio, "value": str(charge_ratio), "units": "%", }, }) class TestUpsMain: """Test ups.main() loop behavior by running a limited number of iterations.""" def _run_main_with_data(self, data_sequence, argv=None): """Run main() with a sequence of UPS data, stopping when data runs out. The first call to read_data is the device-discovery call and returns data_sequence[0]. Subsequent calls consume data_sequence in order. """ # discovery call returns data_sequence[0], then loop calls follow discovery_done = [False] loop_count = [0] def _read_data(*args, **kwargs): if not discovery_done[0]: discovery_done[0] = True return data_sequence[0] if loop_count[0] >= len(data_sequence): raise StopIteration("done") result = data_sequence[loop_count[0]] loop_count[0] += 1 return result with patch("sys.argv", ["bmspy-ups"] + (argv or [])): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=_read_data), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.os.system"): ups_mod.main() return loop_count[0] def _make_data(self, current_amps, charge_ratio): return {"testups": _make_ups(current_amps, charge_ratio)} def test_main_history_pruned_at_10(self): """History is pruned to max 10 items.""" # Send 15 readings to ensure pruning happens data_seq = [self._make_data(0.0, 0.80)] * 15 with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() def test_main_runs_with_not_enough_history(self): """With < 4 readings, should print 'not enough readings' and continue.""" data = [self._make_data(0.0, 0.95)] * 5 self._run_main_with_data(data) def test_main_debug_2_prints_not_enough_readings(self, capsys): data = [self._make_data(0.0, 0.95)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data[0]] + data + [StopIteration]),\ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.os.system"): ups_mod.main() def test_main_below_critical_threshold_triggers_shutdown(self): """When charge drops below critical threshold, shutdown should be triggered.""" # Need 5+ readings (>3 in history) to trigger comparisons ups_mod.scheduled_shutdown = False ups_mod.critical_sent = False data_seq = [] # Populate history with 5 readings all at 20% (below 30% critical) for _ in range(5): data_seq.append(self._make_data(0.0, 0.20)) mock_shutdown = MagicMock() mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups", "--critical", "30"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown", mock_shutdown), \ patch("bmspy.ups.handle_email", mock_email): ups_mod.main() mock_shutdown.assert_called() def test_main_below_warning_threshold_sends_email(self): """When charge drops below warning threshold, email alert should be sent.""" ups_mod.scheduled_shutdown = False ups_mod.warning_sent = False data_seq = [] # 5 readings at 60% (below 75% warning, above 30% critical) for _ in range(5): data_seq.append(self._make_data(0.0, 0.60)) mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups", "--warning", "75", "--critical", "30"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email", mock_email): ups_mod.main() mock_email.assert_called() def test_main_discharge_alert(self): """When current goes negative, discharge alert should be sent. Discharge alert requires: current < 0 AND h1 < 0 AND h2 >= 0. After 5 reads with seq [+1, -1, +1, +1, -1]: h=[+1,-1,+1,+1,-1], current=-1, h1=-1, h2=+1 → triggers alert. """ ups_mod.alert_sent = False data_seq = [ self._make_data(1.0, 0.90), # r1: +1 self._make_data(-1.0, 0.89), # r2: -1 (will be h1) self._make_data(1.0, 0.90), # r3: +1 (will be h2) self._make_data(1.0, 0.90), # r4: +1 self._make_data(-1.0, 0.88), # r5: -1 (current) → alert! ] mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email", mock_email): ups_mod.main() mock_email.assert_called() def test_main_power_regained_alert(self): """When current goes from negative to positive, recovery alert sent. Recovery requires: current >= 0 AND h1 >= 0 AND h2 < 0. After 5 reads with seq [-1, +1, -1, +1, +1]: h=[-1,+1,-1,+1,+1], current=+1, h1=+1, h2=-1 → recovery! """ ups_mod.alert_sent = True ups_mod.scheduled_shutdown = False data_seq = [ self._make_data(-1.0, 0.80), # r1: -1 self._make_data(1.0, 0.81), # r2: +1 (will be h1) self._make_data(-1.0, 0.79), # r3: -1 (will be h2) self._make_data(1.0, 0.81), # r4: +1 self._make_data(1.0, 0.82), # r5: +1 (current) → recovery! ] mock_email = MagicMock() mock_shutdown = MagicMock() with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown", mock_shutdown), \ patch("bmspy.ups.handle_email", mock_email): ups_mod.main() mock_email.assert_called() def test_main_not_enough_history_debug_2(self, capsys): """With debug=2 and < 4 readings, prints 'not enough readings'.""" data_seq = [self._make_data(0.0, 0.95)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "not enough readings" in captured.out.lower() def test_main_below_warning_debug_1(self, capsys): """With debug=1, warning message is printed.""" ups_mod.warning_sent = False data_seq = [self._make_data(0.0, 0.60)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "--warning", "75", "--critical", "30"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "warning" in captured.out.lower() def test_main_discharge_alert_debug_1(self, capsys): """With debug=1, discharge alert message is printed.""" ups_mod.alert_sent = False data_seq = [ self._make_data(1.0, 0.90), self._make_data(-1.0, 0.89), self._make_data(1.0, 0.90), self._make_data(1.0, 0.90), self._make_data(-1.0, 0.88), ] with patch("sys.argv", ["bmspy-ups", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "discharging" in captured.out.lower() def test_main_power_regained_debug_1(self, capsys): """With debug=1, power regained message is printed.""" ups_mod.alert_sent = True data_seq = [ self._make_data(-1.0, 0.80), self._make_data(1.0, 0.81), self._make_data(-1.0, 0.79), self._make_data(1.0, 0.81), self._make_data(1.0, 0.82), ] with patch("sys.argv", ["bmspy-ups", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "power regained" in captured.out.lower() def test_main_debug_2_prints_current_and_capacity(self, capsys): """With debug=2, current and capacity are printed.""" data_seq = [self._make_data(1.0, 0.80)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "current" in captured.out.lower() or "capacity" in captured.out.lower() def test_main_debug_1_prints_thresholds(self, capsys): """With debug=1 and below threshold, threshold messages are printed.""" ups_mod.critical_sent = False data_seq = [self._make_data(0.0, 0.20)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "--critical", "30"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "critical" in captured.out.lower() or "threshold" in captured.out.lower() # --------------------------------------------------------------------------- # main() — device selection errors and --device flag # --------------------------------------------------------------------------- class TestUpsMainDeviceSelection: def _make_data(self, name, current_amps=0.0, charge_ratio=0.8): return {name: _make_ups(current_amps, charge_ratio)} def test_main_no_devices_returns_early(self, capsys): """main() exits cleanly when no devices are found.""" with patch("sys.argv", ["bmspy-ups"]), \ patch("bmspy.ups.client.read_data", return_value={}), \ patch("bmspy.ups.client.handle_registration"): ups_mod.main() assert "no UPS" in capsys.readouterr().out def test_main_multiple_devices_no_flag_returns_early(self, capsys): """main() exits with an error when multiple devices exist and --device is not set.""" two_devices = { "ups1": _make_ups(0.0, 0.9), "ups2": _make_ups(0.0, 0.8), } with patch("sys.argv", ["bmspy-ups"]), \ patch("bmspy.ups.client.read_data", return_value=two_devices), \ patch("bmspy.ups.client.handle_registration"): ups_mod.main() out = capsys.readouterr().out assert "ups1" in out or "ups2" in out def test_main_ups_flag_selects_device(self): """--device selects the correct device from multiple available ones.""" data_seq = [self._make_data("ups2")] * 6 with patch("sys.argv", ["bmspy-ups", "--device", "ups2"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[{"ups1": _make_ups(0.0, 0.9), "ups2": _make_ups(0.0, 0.8)}] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() def test_main_ups_flag_unknown_device_returns_early(self, capsys): """--device with an unknown device name exits with an error.""" with patch("sys.argv", ["bmspy-ups", "--device", "ghost"]), \ patch("bmspy.ups.client.read_data", return_value={"ups1": _make_ups(0.0, 0.9)}), \ patch("bmspy.ups.client.handle_registration"): ups_mod.main() assert "ghost" in capsys.readouterr().out def test_main_device_disappears_in_loop_sleeps(self): """When the named device is missing from a loop response, main() sleeps and retries.""" initial = {"testups": _make_ups(0.0, 0.8)} # Second call returns empty dict (device gone), third raises StopIteration with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=[initial, {}, StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep") as mock_sleep: ups_mod.main() mock_sleep.assert_called()