Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: XNLLLLH <XNLLLLH@users.noreply.github.com>
165 lines
5.7 KiB
Python
165 lines
5.7 KiB
Python
"""Tests for iai-mcp audit CLI (OPS-07 Plan 02-05).
|
|
|
|
`iai-mcp audit [--since WEEKS] [--severity SEV]` renders an identity-event
|
|
audit log, TZ-aware timestamps, and REDACTED shield match counts (D-30
|
|
threat T-02-05-02: leaking matched patterns in CLI output would hand the
|
|
attacker a dictionary of what the shield is watching for).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
|
|
from iai_mcp.cli import main as cli_main
|
|
from iai_mcp.events import write_event
|
|
from iai_mcp.store import MemoryStore
|
|
|
|
|
|
def test_cli_audit_empty(tmp_path, capsys, monkeypatch):
|
|
"""No identity events -> 'No identity events recorded' message, exit 0."""
|
|
monkeypatch.setenv("IAI_MCP_STORE", str(tmp_path))
|
|
code = cli_main(["audit"])
|
|
assert code == 0
|
|
out = capsys.readouterr().out
|
|
assert (
|
|
"no identity events" in out.lower()
|
|
or "no events" in out.lower()
|
|
)
|
|
|
|
|
|
def test_cli_audit_renders_events(tmp_path, capsys, monkeypatch):
|
|
"""Pre-populated events render with kind + ts (in user TZ) + severity."""
|
|
monkeypatch.setenv("IAI_MCP_STORE", str(tmp_path))
|
|
store = MemoryStore(path=tmp_path)
|
|
write_event(
|
|
store, kind="s5_invariant_update",
|
|
data={"anchor_id": "abc", "new_record_id": "def"},
|
|
severity="info", session_id="s1",
|
|
)
|
|
code = cli_main(["audit"])
|
|
assert code == 0
|
|
out = capsys.readouterr().out
|
|
# Kind appears.
|
|
assert "s5_invariant_update" in out
|
|
# Severity visible.
|
|
assert "info" in out
|
|
|
|
|
|
def test_cli_audit_since_weeks(tmp_path, capsys, monkeypatch):
|
|
"""`audit --since=2` filters to 2-week window without crashing."""
|
|
monkeypatch.setenv("IAI_MCP_STORE", str(tmp_path))
|
|
store = MemoryStore(path=tmp_path)
|
|
write_event(
|
|
store, kind="s5_invariant_update",
|
|
data={"anchor_id": "abc"},
|
|
severity="info", session_id="s1",
|
|
)
|
|
code = cli_main(["audit", "--since=2"])
|
|
assert code == 0
|
|
|
|
|
|
def test_cli_audit_severity_filter_warning_only(tmp_path, capsys, monkeypatch):
|
|
"""`--severity=warning` filters out info-severity events."""
|
|
monkeypatch.setenv("IAI_MCP_STORE", str(tmp_path))
|
|
store = MemoryStore(path=tmp_path)
|
|
write_event(
|
|
store, kind="s5_invariant_update",
|
|
data={"anchor_id": "abc"},
|
|
severity="info", session_id="s1",
|
|
)
|
|
write_event(
|
|
store, kind="s5_drift_alert",
|
|
data={"first_value": 0.1, "last_value": 0.5},
|
|
severity="warning", session_id="s2",
|
|
)
|
|
code = cli_main(["audit", "--severity=warning"])
|
|
assert code == 0
|
|
out = capsys.readouterr().out
|
|
# Warning event mentioned; info event NOT.
|
|
assert "s5_drift_alert" in out
|
|
assert "s5_invariant_update" not in out
|
|
|
|
|
|
def test_cli_audit_shows_shield_rejections_redacted(tmp_path, capsys, monkeypatch):
|
|
"""shield_rejection events appear but matched patterns are redacted to
|
|
count only (not the literal words)."""
|
|
monkeypatch.setenv("IAI_MCP_STORE", str(tmp_path))
|
|
store = MemoryStore(path=tmp_path)
|
|
write_event(
|
|
store, kind="shield_rejection",
|
|
data={
|
|
"tier": "hard_block",
|
|
"matched": ["forget", "you are now", "override"],
|
|
"record_id": "aabbcc",
|
|
"action": "reject",
|
|
},
|
|
severity="critical", session_id="s1",
|
|
)
|
|
code = cli_main(["audit"])
|
|
assert code == 0
|
|
out = capsys.readouterr().out
|
|
# kind visible.
|
|
assert "shield_rejection" in out
|
|
# matched COUNT visible (3 patterns).
|
|
assert "3" in out or "matched_count=3" in out.replace(" ", "")
|
|
# Literal signal words MUST NOT appear (redaction).
|
|
assert "forget" not in out
|
|
assert "you are now" not in out
|
|
|
|
|
|
# ---------------------------------------------------------------- subcommands
|
|
|
|
|
|
def test_cli_audit_shield_subcommand(tmp_path, capsys, monkeypatch):
|
|
"""`iai-mcp audit shield --since=7` returns shield events."""
|
|
monkeypatch.setenv("IAI_MCP_STORE", str(tmp_path))
|
|
store = MemoryStore(path=tmp_path)
|
|
write_event(
|
|
store, kind="shield_rejection",
|
|
data={"tier": "hard_block", "matched": ["forget"], "action": "reject"},
|
|
severity="critical", session_id="s1",
|
|
)
|
|
# Exercise the subcommand; no crash is the contract.
|
|
code = cli_main(["audit", "shield", "--since=7"])
|
|
assert code == 0
|
|
|
|
|
|
def test_cli_audit_drift_subcommand(tmp_path, capsys, monkeypatch):
|
|
"""`iai-mcp audit drift` runs detection + surfaces present alert."""
|
|
monkeypatch.setenv("IAI_MCP_STORE", str(tmp_path))
|
|
store = MemoryStore(path=tmp_path)
|
|
# Seed monotonically increasing M4 variance to trigger drift.
|
|
for i, v in enumerate([0.1, 0.2, 0.3, 0.4, 0.5]):
|
|
write_event(
|
|
store, kind="trajectory_metric",
|
|
data={"metric": "m4", "value": v},
|
|
severity="info", session_id=f"s{i}",
|
|
)
|
|
code = cli_main(["audit", "drift"])
|
|
assert code == 0
|
|
out = capsys.readouterr().out
|
|
# Drift detected and surfaced.
|
|
assert "drift" in out.lower()
|
|
|
|
|
|
def test_cli_audit_identity_subcommand(tmp_path, capsys, monkeypatch):
|
|
"""`iai-mcp audit identity` shows only s5_* events."""
|
|
monkeypatch.setenv("IAI_MCP_STORE", str(tmp_path))
|
|
store = MemoryStore(path=tmp_path)
|
|
write_event(
|
|
store, kind="s5_invariant_update",
|
|
data={"anchor_id": "abc"},
|
|
severity="info", session_id="s1",
|
|
)
|
|
write_event(
|
|
store, kind="shield_rejection",
|
|
data={"tier": "hard_block", "matched": ["forget"], "action": "reject"},
|
|
severity="critical", session_id="s2",
|
|
)
|
|
code = cli_main(["audit", "identity"])
|
|
assert code == 0
|
|
out = capsys.readouterr().out
|
|
# s5 event present; shield_rejection filtered out.
|
|
assert "s5_invariant_update" in out
|
|
assert "shield_rejection" not in out
|