import pytest import smtplib from unittest.mock import patch, MagicMock import bmspy.ups as ups_mod from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField, UPS from bmspy.ups import _get_field_value, handle_shutdown, handle_email # --------------------------------------------------------------------------- # Fixtures: reset module-level globals between tests # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def _reset_globals(): ups_mod.scheduled_shutdown = False ups_mod.critical_sent = False ups_mod.warning_sent = False ups_mod.alert_sent = False yield ups_mod.scheduled_shutdown = False ups_mod.critical_sent = False ups_mod.warning_sent = False ups_mod.alert_sent = False # --------------------------------------------------------------------------- # _get_field_value # --------------------------------------------------------------------------- class TestGetFieldValue: def _ups(self): return UPS.from_dict({ "bms_current_amps": { "help": "Current", "raw_value": -2.5, "value": "-2.50", "units": "A", }, "bms_capacity_charge_ratio": { "help": "Percent Charge", "raw_value": 0.75, "value": "0.75", "units": "%", }, "bms_date": { "help": "Date", "info": "2023-01-15", }, "bms_cells": { "help": "Cells", "label": "cell", "raw_values": {1: 3.6}, "values": {1: "3.600"}, "units": "V", }, }) def test_found_scalar_field(self): ups = self._ups() assert _get_field_value(ups, "bms_current_amps") == pytest.approx(-2.5) def test_found_charge_ratio(self): ups = self._ups() assert _get_field_value(ups, "bms_capacity_charge_ratio") == pytest.approx(0.75) def test_field_not_found_returns_none(self): ups = self._ups() assert _get_field_value(ups, "nonexistent_field") is None def test_info_field_returns_none(self): # BMSInfoField is not a scalar, should return None ups = self._ups() assert _get_field_value(ups, "bms_date") is None def test_multi_field_returns_none(self): # BMSMultiField is not a scalar, should return None ups = self._ups() assert _get_field_value(ups, "bms_cells") is None # --------------------------------------------------------------------------- # handle_shutdown # --------------------------------------------------------------------------- class TestHandleShutdown: def test_shutdown_calls_os_system(self): with patch("bmspy.ups.os.system") as mock_sys: handle_shutdown(action="shutdown", delay=5) mock_sys.assert_called_once_with("/sbin/shutdown 5") def test_shutdown_sets_scheduled_flag(self): with patch("bmspy.ups.os.system"): handle_shutdown(action="shutdown", delay=5) assert ups_mod.scheduled_shutdown is not False def test_shutdown_does_not_reschedule_if_already_scheduled(self): ups_mod.scheduled_shutdown = 9999999999.0 with patch("bmspy.ups.os.system") as mock_sys: handle_shutdown(action="shutdown", delay=5) mock_sys.assert_not_called() def test_cancel_calls_os_system(self): with patch("bmspy.ups.os.system") as mock_sys: handle_shutdown(action="cancel") mock_sys.assert_called_once_with("/sbin/shutdown -c") def test_cancel_does_not_set_flag(self): with patch("bmspy.ups.os.system"): handle_shutdown(action="cancel") assert ups_mod.scheduled_shutdown is False # --------------------------------------------------------------------------- # handle_email # --------------------------------------------------------------------------- class TestHandleEmail: def _mock_smtp(self): mock_server = MagicMock() mock_smtp_class = MagicMock() mock_smtp_class.return_value.__enter__ = MagicMock(return_value=mock_server) mock_smtp_class.return_value.__exit__ = MagicMock(return_value=False) return mock_smtp_class, mock_server def test_basic_send(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class): handle_email( text="test message", level="Alert", recipient="user@example.com", ) mock_server.sendmail.assert_called_once() def test_ssl_port_triggers_starttls(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \ patch("bmspy.ups.ssl.create_default_context") as mock_ssl: handle_email( text="test", level="Alert", recipient="user@example.com", port=465, ) mock_server.starttls.assert_called_once() def test_port_587_also_triggers_starttls(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \ patch("bmspy.ups.ssl.create_default_context"): handle_email( text="test", level="Alert", recipient="user@example.com", port=587, ) mock_server.starttls.assert_called_once() def test_no_ssl_no_starttls(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class): handle_email( text="test", level="Alert", recipient="user@example.com", port=25, ) mock_server.starttls.assert_not_called() def test_with_credentials_calls_login(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class): handle_email( text="test", level="Alert", recipient="user@example.com", mailuser="user", mailpass="pass", ) mock_server.login.assert_called_once_with("user", "pass") def test_without_credentials_no_login(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class): handle_email( text="test", level="Alert", recipient="user@example.com", ) mock_server.login.assert_not_called() def test_recipient_without_at_gets_hostname_appended(self): mock_smtp_class, mock_server = self._mock_smtp() with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \ patch("bmspy.ups.socket.gethostname", return_value="myhost"): handle_email( text="test", level="Alert", recipient="root", ) # sendmail recipient should be root@myhost args = mock_server.sendmail.call_args[0] assert args[1] == "root@myhost" # --------------------------------------------------------------------------- # ups main() - comprehensive loop testing # --------------------------------------------------------------------------- def _make_ups(current_amps: float, charge_ratio: float) -> UPS: """Build a UPS with bms_current_amps and bms_capacity_charge_ratio.""" return UPS.from_dict({ "bms_current_amps": { "help": "Current", "raw_value": current_amps, "value": str(current_amps), "units": "A", }, "bms_capacity_charge_ratio": { "help": "Charge Ratio", "raw_value": charge_ratio, "value": str(charge_ratio), "units": "%", }, }) class TestUpsMain: """Test ups.main() loop behavior by running a limited number of iterations.""" def _run_main_with_data(self, data_sequence, argv=None, extra_patches=None): """Run main() with a sequence of UPS data, stopping when data runs out.""" call_count = 0 def _read_data(*args, **kwargs): nonlocal call_count if call_count >= len(data_sequence): raise StopIteration("done") result = data_sequence[call_count] call_count += 1 return result patches = { "bmspy.ups.client.read_data": _read_data, "bmspy.ups.client.handle_registration": MagicMock(), "bmspy.ups.time.sleep": MagicMock(), "bmspy.ups.os.system": MagicMock(), } if extra_patches: patches.update(extra_patches) with patch("sys.argv", ["bmspy-ups"] + (argv or [])): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=_read_data), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.os.system"): ups_mod.main() return call_count def _make_data(self, current_amps, charge_ratio): return {"testups": _make_ups(current_amps, charge_ratio)} def test_main_history_pruned_at_10(self): """History is pruned to max 10 items.""" # Send 15 readings to ensure pruning happens data_seq = [self._make_data(0.0, 0.80)] * 15 with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() def test_main_runs_with_not_enough_history(self): """With < 4 readings, should print 'not enough readings' and continue.""" data = [self._make_data(0.0, 0.95)] * 5 self._run_main_with_data(data) def test_main_debug_2_prints_not_enough_readings(self, capsys): data = [self._make_data(0.0, 0.95)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data + [StopIteration]),\ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.os.system"): ups_mod.main() def test_main_below_critical_threshold_triggers_shutdown(self): """When charge drops below critical threshold, shutdown should be triggered.""" # Need 5+ readings (>3 in history) to trigger comparisons ups_mod.scheduled_shutdown = False ups_mod.critical_sent = False data_seq = [] # Populate history with 5 readings all at 20% (below 30% critical) for _ in range(5): data_seq.append(self._make_data(0.0, 0.20)) mock_shutdown = MagicMock() mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups", "--critical", "30"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown", mock_shutdown), \ patch("bmspy.ups.handle_email", mock_email): ups_mod.main() mock_shutdown.assert_called() def test_main_below_warning_threshold_sends_email(self): """When charge drops below warning threshold, email alert should be sent.""" ups_mod.scheduled_shutdown = False ups_mod.warning_sent = False data_seq = [] # 5 readings at 60% (below 75% warning, above 30% critical) for _ in range(5): data_seq.append(self._make_data(0.0, 0.60)) mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups", "--warning", "75", "--critical", "30"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email", mock_email): ups_mod.main() mock_email.assert_called() def test_main_discharge_alert(self): """When current goes negative, discharge alert should be sent. Discharge alert requires: current < 0 AND h1 < 0 AND h2 >= 0. After 5 reads with seq [+1, -1, +1, +1, -1]: h=[+1,-1,+1,+1,-1], current=-1, h1=-1, h2=+1 → triggers alert. """ ups_mod.alert_sent = False data_seq = [ self._make_data(1.0, 0.90), # r1: +1 self._make_data(-1.0, 0.89), # r2: -1 (will be h1) self._make_data(1.0, 0.90), # r3: +1 (will be h2) self._make_data(1.0, 0.90), # r4: +1 self._make_data(-1.0, 0.88), # r5: -1 (current) → alert! ] mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email", mock_email): ups_mod.main() mock_email.assert_called() def test_main_power_regained_alert(self): """When current goes from negative to positive, recovery alert sent. Recovery requires: current >= 0 AND h1 >= 0 AND h2 < 0. After 5 reads with seq [-1, +1, -1, +1, +1]: h=[-1,+1,-1,+1,+1], current=+1, h1=+1, h2=-1 → recovery! """ ups_mod.alert_sent = True ups_mod.scheduled_shutdown = False data_seq = [ self._make_data(-1.0, 0.80), # r1: -1 self._make_data(1.0, 0.81), # r2: +1 (will be h1) self._make_data(-1.0, 0.79), # r3: -1 (will be h2) self._make_data(1.0, 0.81), # r4: +1 self._make_data(1.0, 0.82), # r5: +1 (current) → recovery! ] mock_email = MagicMock() mock_shutdown = MagicMock() with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown", mock_shutdown), \ patch("bmspy.ups.handle_email", mock_email): ups_mod.main() mock_email.assert_called() def test_main_not_enough_history_debug_2(self, capsys): """With debug=2 and < 4 readings, prints 'not enough readings'.""" data_seq = [self._make_data(0.0, 0.95)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "not enough readings" in captured.out.lower() def test_main_below_warning_debug_1(self, capsys): """With debug=1, warning message is printed.""" ups_mod.warning_sent = False data_seq = [self._make_data(0.0, 0.60)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "--warning", "75", "--critical", "30"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "warning" in captured.out.lower() def test_main_discharge_alert_debug_1(self, capsys): """With debug=1, discharge alert message is printed.""" ups_mod.alert_sent = False data_seq = [ self._make_data(1.0, 0.90), self._make_data(-1.0, 0.89), self._make_data(1.0, 0.90), self._make_data(1.0, 0.90), self._make_data(-1.0, 0.88), ] with patch("sys.argv", ["bmspy-ups", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "discharging" in captured.out.lower() def test_main_power_regained_debug_1(self, capsys): """With debug=1, power regained message is printed.""" ups_mod.alert_sent = True data_seq = [ self._make_data(-1.0, 0.80), self._make_data(1.0, 0.81), self._make_data(-1.0, 0.79), self._make_data(1.0, 0.81), self._make_data(1.0, 0.82), ] with patch("sys.argv", ["bmspy-ups", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "power regained" in captured.out.lower() def test_main_debug_2_prints_current_and_capacity(self, capsys): """With debug=2, current and capacity are printed.""" data_seq = [self._make_data(1.0, 0.80)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "current" in captured.out.lower() or "capacity" in captured.out.lower() def test_main_debug_1_prints_thresholds(self, capsys): """With debug=1 and below threshold, threshold messages are printed.""" ups_mod.critical_sent = False data_seq = [self._make_data(0.0, 0.20)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "--critical", "30"]): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ patch("bmspy.ups.handle_email"): ups_mod.main() captured = capsys.readouterr() assert "critical" in captured.out.lower() or "threshold" in captured.out.lower()