Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: XNLLLLH <XNLLLLH@users.noreply.github.com>
312 lines
11 KiB
Python
312 lines
11 KiB
Python
"""Phase 10.4 — comprehensive tests for ``IdleDetector``.
|
|
|
|
Covers the 11-test matrix from CONTEXT 10.4:
|
|
- HIDIdleTime parses ioreg output (ns -> sec).
|
|
- HIDIdleTime returns None when ioreg missing (FileNotFoundError).
|
|
- pmset detects sleep event within window.
|
|
- pmset returns False when no recent sleep within window.
|
|
- pmset returns False when pmset binary missing.
|
|
- sleep_eligible heartbeat-idle path (heartbeat_idle_30min=True).
|
|
- sleep_eligible HID idle path (HIDIdleTime >= 30 min).
|
|
- sleep_eligible pmset path (pmset_recent_sleep=True).
|
|
- sleep_eligible all-False path.
|
|
- status() reports IdleStatus shape with all signals available.
|
|
- status() when signals missing reports empty available_signals.
|
|
|
|
All subprocess interactions are mocked so the suite is deterministic and
|
|
runs on non-macOS hosts as well — real ioreg / pmset spawns would make
|
|
the suite host-dependent.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import subprocess
|
|
from datetime import datetime, timedelta, timezone
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from iai_mcp.idle_detector import IdleDetector, IdleStatus
|
|
|
|
|
|
# ---------------------------------------------------------------- fixtures
|
|
|
|
|
|
def _completed_process(
|
|
stdout: str = "", returncode: int = 0
|
|
) -> subprocess.CompletedProcess[str]:
|
|
"""Build a fake ``CompletedProcess`` for subprocess.run mocks."""
|
|
proc: subprocess.CompletedProcess[str] = subprocess.CompletedProcess(
|
|
args=[], returncode=returncode, stdout=stdout, stderr=""
|
|
)
|
|
return proc
|
|
|
|
|
|
def _ioreg_stdout(idle_ns: int) -> str:
|
|
"""Build a minimal ioreg-shaped stdout containing a HIDIdleTime line.
|
|
|
|
The real ioreg output is a deeply nested I/O-Registry tree; we only
|
|
need the literal token the parser searches for.
|
|
"""
|
|
return (
|
|
"+-o IOHIDSystem <class IOHIDSystem, id 0x100000abc, registered>\n"
|
|
f' | "HIDIdleTime" = {idle_ns}\n'
|
|
' | "DisplayWrangler" = 1\n'
|
|
)
|
|
|
|
|
|
def _pmset_log_stdout(events: list[tuple[str, str]]) -> str:
|
|
"""Build a fake pmset -g log stdout from ``[(timestamp, marker), ...]``.
|
|
|
|
Each event becomes a line in the format the real pmset emits — the
|
|
timestamp regex anchors the line, and the marker substring is what
|
|
``_PMSET_SLEEP_MARKERS`` searches for.
|
|
"""
|
|
lines = []
|
|
for ts, marker in events:
|
|
lines.append(f"{ts} {marker} Notification clientId=foo")
|
|
return "\n".join(lines) + ("\n" if lines else "")
|
|
|
|
|
|
def _now_pmset_ts(offset_min: int) -> str:
|
|
"""Return a pmset-formatted UTC timestamp ``offset_min`` minutes ago.
|
|
|
|
The real log uses local-time timestamps with explicit offsets; using
|
|
``+0000`` (UTC) here is well-defined and the parser handles any
|
|
offset uniformly.
|
|
"""
|
|
ts = datetime.now(timezone.utc) - timedelta(minutes=offset_min)
|
|
return ts.strftime("%Y-%m-%d %H:%M:%S +0000")
|
|
|
|
|
|
# ---------------------------------------------------------------- HIDIdleTime
|
|
|
|
|
|
def test_hid_idle_time_sec_parses_ioreg_output() -> None:
|
|
"""``HIDIdleTime = 612000000000`` (ns) -> 612 seconds."""
|
|
fake = _completed_process(stdout=_ioreg_stdout(idle_ns=612_000_000_000))
|
|
with patch("iai_mcp.idle_detector.subprocess.run", return_value=fake):
|
|
result = IdleDetector().hid_idle_time_sec()
|
|
assert result == 612
|
|
|
|
|
|
def test_hid_idle_time_sec_returns_none_when_ioreg_missing() -> None:
|
|
"""FileNotFoundError on ioreg -> None (graceful fallback)."""
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
side_effect=FileNotFoundError(2, "No such file"),
|
|
):
|
|
result = IdleDetector().hid_idle_time_sec()
|
|
assert result is None
|
|
|
|
|
|
def test_hid_idle_time_sec_returns_none_on_nonzero_exit() -> None:
|
|
"""ioreg exits non-zero -> None (treat as unavailable)."""
|
|
fake = _completed_process(stdout="", returncode=1)
|
|
with patch("iai_mcp.idle_detector.subprocess.run", return_value=fake):
|
|
result = IdleDetector().hid_idle_time_sec()
|
|
assert result is None
|
|
|
|
|
|
def test_hid_idle_time_sec_returns_none_on_timeout() -> None:
|
|
"""ioreg timeout -> None (don't block the lifecycle TICK)."""
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired(cmd=["ioreg"], timeout=5),
|
|
):
|
|
result = IdleDetector().hid_idle_time_sec()
|
|
assert result is None
|
|
|
|
|
|
# ---------------------------------------------------------------- pmset
|
|
|
|
|
|
def test_pmset_recent_sleep_detects_event_within_window() -> None:
|
|
"""Sleep event 2 min ago, window=5 -> True."""
|
|
log = _pmset_log_stdout([
|
|
(_now_pmset_ts(offset_min=2), "System Sleep"),
|
|
])
|
|
fake = _completed_process(stdout=log)
|
|
with patch("iai_mcp.idle_detector.subprocess.run", return_value=fake):
|
|
result = IdleDetector().pmset_recent_sleep(window_min=5)
|
|
assert result is True
|
|
|
|
|
|
def test_pmset_recent_sleep_detects_display_off_event() -> None:
|
|
"""'Display is turned off' marker also counts (per CONTEXT 10.4)."""
|
|
log = _pmset_log_stdout([
|
|
(_now_pmset_ts(offset_min=1), "Display is turned off"),
|
|
])
|
|
fake = _completed_process(stdout=log)
|
|
with patch("iai_mcp.idle_detector.subprocess.run", return_value=fake):
|
|
result = IdleDetector().pmset_recent_sleep(window_min=5)
|
|
assert result is True
|
|
|
|
|
|
def test_pmset_recent_sleep_returns_false_when_no_recent_event() -> None:
|
|
"""Sleep events older than window -> False."""
|
|
log = _pmset_log_stdout([
|
|
(_now_pmset_ts(offset_min=60), "System Sleep"),
|
|
(_now_pmset_ts(offset_min=120), "Display is turned off"),
|
|
])
|
|
fake = _completed_process(stdout=log)
|
|
with patch("iai_mcp.idle_detector.subprocess.run", return_value=fake):
|
|
result = IdleDetector().pmset_recent_sleep(window_min=5)
|
|
assert result is False
|
|
|
|
|
|
def test_pmset_recent_sleep_returns_false_when_pmset_missing() -> None:
|
|
"""FileNotFoundError on pmset -> False (graceful fallback)."""
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
side_effect=FileNotFoundError(2, "No such file"),
|
|
):
|
|
result = IdleDetector().pmset_recent_sleep()
|
|
assert result is False
|
|
|
|
|
|
# ---------------------------------------------------------------- sleep_eligible disjunction
|
|
|
|
|
|
def test_sleep_eligible_heartbeat_idle_path() -> None:
|
|
"""heartbeat_idle_30min=True alone short-circuits to True.
|
|
|
|
Importantly, the implementation must NOT spawn ioreg/pmset when this
|
|
path triggers — we patch subprocess.run to fail loudly to verify.
|
|
"""
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
side_effect=AssertionError("must not spawn when heartbeat-idle is True"),
|
|
):
|
|
result = IdleDetector().sleep_eligible(heartbeat_idle_30min=True)
|
|
assert result is True
|
|
|
|
|
|
def test_sleep_eligible_hid_idle_path() -> None:
|
|
"""HIDIdleTime=1900s (>30 min), heartbeat False -> True via HID disjunct."""
|
|
# First call: ioreg returns HIDIdleTime=1900s. Second call would be
|
|
# pmset but should not happen because hid_idle path short-circuits.
|
|
fake_ioreg = _completed_process(
|
|
stdout=_ioreg_stdout(idle_ns=1900 * 1_000_000_000)
|
|
)
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
return_value=fake_ioreg,
|
|
) as run_mock:
|
|
result = IdleDetector().sleep_eligible(heartbeat_idle_30min=False)
|
|
assert result is True
|
|
# ioreg called once; pmset must NOT have been called (short-circuit).
|
|
assert run_mock.call_count == 1
|
|
|
|
|
|
def test_sleep_eligible_pmset_path() -> None:
|
|
"""heartbeat False, HID below threshold, pmset event recent -> True."""
|
|
fake_ioreg = _completed_process(
|
|
stdout=_ioreg_stdout(idle_ns=10 * 1_000_000_000) # 10s -- below threshold
|
|
)
|
|
fake_pmset = _completed_process(
|
|
stdout=_pmset_log_stdout([
|
|
(_now_pmset_ts(offset_min=2), "System Sleep"),
|
|
])
|
|
)
|
|
# subprocess.run is called twice: ioreg, then pmset.
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
side_effect=[fake_ioreg, fake_pmset],
|
|
):
|
|
result = IdleDetector().sleep_eligible(heartbeat_idle_30min=False)
|
|
assert result is True
|
|
|
|
|
|
def test_sleep_eligible_all_false() -> None:
|
|
"""All three disjuncts False -> overall False."""
|
|
fake_ioreg = _completed_process(
|
|
stdout=_ioreg_stdout(idle_ns=10 * 1_000_000_000)
|
|
)
|
|
fake_pmset = _completed_process(stdout="") # empty log
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
side_effect=[fake_ioreg, fake_pmset],
|
|
):
|
|
result = IdleDetector().sleep_eligible(heartbeat_idle_30min=False)
|
|
assert result is False
|
|
|
|
|
|
# ---------------------------------------------------------------- status() snapshot
|
|
|
|
|
|
def test_status_for_doctor_row_all_signals_available() -> None:
|
|
"""status() reports both signals when ioreg + pmset both succeed.
|
|
|
|
Three subprocess.run calls expected:
|
|
1. ioreg (hid_idle_time_sec)
|
|
2. pmset -g log (pmset_recent_sleep)
|
|
3. pmset -g (responsiveness probe inside _pmset_responsive)
|
|
"""
|
|
fake_ioreg = _completed_process(
|
|
stdout=_ioreg_stdout(idle_ns=42 * 1_000_000_000)
|
|
)
|
|
fake_pmset_log = _completed_process(stdout="")
|
|
fake_pmset_g = _completed_process(stdout="Now drawing from 'AC Power'\n")
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
side_effect=[fake_ioreg, fake_pmset_log, fake_pmset_g],
|
|
):
|
|
status = IdleDetector().status()
|
|
|
|
assert isinstance(status, IdleStatus)
|
|
assert status.hid_idle_sec == 42
|
|
assert status.pmset_recent_sleep is False
|
|
assert "HIDIdleTime" in status.available_signals
|
|
assert "pmset" in status.available_signals
|
|
|
|
|
|
def test_status_when_signals_missing() -> None:
|
|
"""All subprocess calls fail -> available_signals == []."""
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run",
|
|
side_effect=FileNotFoundError(2, "No such file"),
|
|
):
|
|
status = IdleDetector().status()
|
|
assert status.hid_idle_sec is None
|
|
assert status.pmset_recent_sleep is False
|
|
assert status.available_signals == []
|
|
|
|
|
|
# ---------------------------------------------------------------- subprocess hardening
|
|
|
|
|
|
def test_subprocess_uses_array_form_not_shell() -> None:
|
|
"""Verify all subprocess calls use array form with shell=False.
|
|
|
|
Captures the actual ``args`` and ``kwargs`` passed to ``subprocess.run``
|
|
and asserts:
|
|
- ``args[0]`` (the command) is a list, not a string.
|
|
- ``kwargs.get("shell", False)`` is False (or missing).
|
|
- A finite ``timeout`` is set on every call.
|
|
"""
|
|
fake = _completed_process(stdout="")
|
|
captured: list[tuple[tuple, dict]] = []
|
|
|
|
def _capture(*args, **kwargs):
|
|
captured.append((args, kwargs))
|
|
return fake
|
|
|
|
with patch(
|
|
"iai_mcp.idle_detector.subprocess.run", side_effect=_capture
|
|
):
|
|
IdleDetector().status()
|
|
|
|
assert len(captured) >= 1
|
|
for args, kwargs in captured:
|
|
# First positional arg is the command. Must be a list.
|
|
assert isinstance(args[0], list), (
|
|
f"subprocess.run command must be a list (array form), got: {args[0]!r}"
|
|
)
|
|
# shell defaults to False when unset; explicitly assert it's not True.
|
|
assert kwargs.get("shell", False) is False, (
|
|
"subprocess.run must NOT use shell=True (PATH-injection risk)"
|
|
)
|
|
# Timeout must be set so a hung tool can't block the TICK.
|
|
assert "timeout" in kwargs and kwargs["timeout"] > 0, (
|
|
f"subprocess.run must set a finite timeout, got: {kwargs}"
|
|
)
|