Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: XNLLLLH <XNLLLLH@users.noreply.github.com>
575 lines
21 KiB
Python
575 lines
21 KiB
Python
"""Tests for R8 — cleanup migration safety.
|
||
|
||
Locked decisions covered (06-CONTEXT.md):
|
||
- top-level `iai-mcp schema-cleanup` subcommand with `[--dry-run] [--apply]
|
||
[--store-path PATH]`. Default mode is `--dry-run` (Beer VSM S2 reversibility).
|
||
- `tier="semantic_pruned"` records remain in store indefinitely.
|
||
- `SEMANTIC_PRUNED_TIER` constant in `src/iai_mcp/types.py`.
|
||
- snapshot directory naming `~/.iai-mcp/lancedb-pre-cleanup-YYYYMMDDTHHMMSSZ`
|
||
(UTC ISO-8601 basic format, no colons; filesystem-safe macOS + Linux).
|
||
- pytest under `tests/` (single file: `test_migrate_cleanup.py`).
|
||
|
||
R8 acceptance (06-SPEC.md): N=12 known duplicates across 4 patterns →
|
||
`--dry-run` reports the diff without mutating; `--apply` snapshots
|
||
the LanceDB tables BEFORE any write, soft-deletes via tier rename to
|
||
`semantic_pruned`, reinforces incoming `schema_instance_of` edges
|
||
onto the keeper, emits `schema_cleanup_run` event, and is idempotent
|
||
(re-running on the migrated store reports zero changes).
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from uuid import uuid4
|
||
|
||
import pytest
|
||
|
||
|
||
# ---------------------------------------------------------------- helpers
|
||
|
||
|
||
def _rec(
|
||
*,
|
||
text: str = "t",
|
||
tags: list[str] | None = None,
|
||
language: str = "en",
|
||
tier: str = "semantic",
|
||
detail_level: int = 2,
|
||
created_at: datetime | None = None,
|
||
):
|
||
"""Build a fresh MemoryRecord for fixtures (avoids loading the full embedder)."""
|
||
from iai_mcp.types import EMBED_DIM, MemoryRecord
|
||
|
||
now = created_at or datetime.now(timezone.utc)
|
||
return MemoryRecord(
|
||
id=uuid4(),
|
||
tier=tier,
|
||
literal_surface=text,
|
||
aaak_index="",
|
||
embedding=[1.0] + [0.0] * (EMBED_DIM - 1),
|
||
community_id=None,
|
||
centrality=0.0,
|
||
detail_level=detail_level,
|
||
pinned=False,
|
||
stability=0.0,
|
||
difficulty=0.0,
|
||
last_reviewed=None,
|
||
never_decay=False,
|
||
never_merge=False,
|
||
provenance=[],
|
||
created_at=now,
|
||
updated_at=now,
|
||
tags=list(tags or []),
|
||
language=language,
|
||
)
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def _patch_embedder(monkeypatch):
|
||
"""Avoid loading bge-m3 / bge-small during cleanup tests — perf hygiene."""
|
||
from iai_mcp import embed as embed_mod
|
||
from iai_mcp.types import EMBED_DIM
|
||
|
||
class _FakeEmbedder:
|
||
DIM = EMBED_DIM
|
||
DEFAULT_DIM = EMBED_DIM
|
||
DEFAULT_MODEL_KEY = "fake"
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
self.DIM = EMBED_DIM
|
||
|
||
def embed(self, text: str) -> list[float]:
|
||
return [1.0] + [0.0] * (EMBED_DIM - 1)
|
||
|
||
def embed_batch(self, texts):
|
||
return [self.embed(t) for t in texts]
|
||
|
||
monkeypatch.setattr(embed_mod, "Embedder", _FakeEmbedder)
|
||
yield
|
||
|
||
|
||
# ---------------------------------------------------------------- Task 1: SEMANTIC_PRUNED_TIER constant + TIER_ENUM extension
|
||
|
||
|
||
def test_semantic_pruned_tier_constant_and_enum_membership():
|
||
"""SEMANTIC_PRUNED_TIER is exported and present in TIER_ENUM."""
|
||
from iai_mcp.types import SEMANTIC_PRUNED_TIER, TIER_ENUM
|
||
|
||
assert SEMANTIC_PRUNED_TIER == "semantic_pruned", (
|
||
"D-09 mandates the constant value 'semantic_pruned' (used as a soft-delete "
|
||
"sentinel by cleanup_schema_duplicates)."
|
||
)
|
||
assert SEMANTIC_PRUNED_TIER in TIER_ENUM, (
|
||
"TIER_ENUM must include 'semantic_pruned' so MemoryRecord.__post_init__ "
|
||
"tier validation accepts pruned rows when reading them back from the store."
|
||
)
|
||
|
||
|
||
def test_memoryrecord_accepts_semantic_pruned_tier():
|
||
"""constructing a MemoryRecord with tier='semantic_pruned' succeeds."""
|
||
rec = _rec(tier="semantic_pruned", text="pruned dup")
|
||
# Should not raise.
|
||
assert rec.tier == "semantic_pruned"
|
||
|
||
|
||
def test_memoryrecord_existing_tiers_still_accepted():
|
||
"""Negative-control: extending TIER_ENUM does not regress existing tier acceptance."""
|
||
for tier in ("working", "episodic", "semantic", "procedural", "parametric"):
|
||
rec = _rec(tier=tier, text=f"t-{tier}")
|
||
assert rec.tier == tier
|
||
|
||
|
||
def test_memoryrecord_invalid_tier_still_raises():
|
||
"""Negative-control: garbage tier values still rejected after extension."""
|
||
with pytest.raises(ValueError, match="invalid tier"):
|
||
_rec(tier="garbage")
|
||
|
||
|
||
# ---------------------------------------------------------------- Task 2: cleanup_schema_duplicates callable
|
||
|
||
|
||
def _seed_dup_store(
|
||
tmp_path: Path,
|
||
n_per_pattern: int = 4,
|
||
n_patterns: int = 3,
|
||
extra_singletons: int = 0,
|
||
):
|
||
"""Insert duplicate schema records DIRECTLY via store.insert(MemoryRecord(...)).
|
||
|
||
made `persist_schema` idempotent so it would refuse to create the
|
||
duplicate state we need for the test — the cleanup is a one-shot recovery for
|
||
stores that accumulated duplicates BEFORE shipped.
|
||
|
||
Each duplicate group also receives one inbound `schema_instance_of` edge
|
||
from a freshly-inserted episodic evidence record, so the edge-reinforcement
|
||
assertion has data to count.
|
||
|
||
Returns (store, patterns) for downstream introspection.
|
||
"""
|
||
from iai_mcp.store import MemoryStore
|
||
|
||
store = MemoryStore(path=tmp_path)
|
||
base = datetime.now(timezone.utc)
|
||
patterns: list[str] = []
|
||
|
||
for p_idx in range(n_patterns):
|
||
pattern = f"tags:capture+role:user+p{p_idx}"
|
||
patterns.append(pattern)
|
||
# Insert N schema rows (oldest first so the first insert is the keeper).
|
||
schema_ids = []
|
||
for s_idx in range(n_per_pattern):
|
||
sched_at = base + timedelta(seconds=p_idx * 60 + s_idx)
|
||
sch = _rec(
|
||
text=f"schema-p{p_idx}-i{s_idx}",
|
||
tier="semantic",
|
||
tags=[f"pattern:{pattern}", "schema"],
|
||
created_at=sched_at,
|
||
)
|
||
store.insert(sch)
|
||
schema_ids.append(sch.id)
|
||
|
||
# One incoming schema_instance_of edge per schema row so each row
|
||
# has at least one incident edge that needs redirecting onto the
|
||
# keeper at cleanup time.
|
||
ev = _rec(
|
||
text=f"ev-p{p_idx}-i{s_idx}",
|
||
tier="episodic",
|
||
tags=["capture", "role:user"],
|
||
created_at=sched_at,
|
||
)
|
||
store.insert(ev)
|
||
store.boost_edges(
|
||
[(ev.id, sch.id)],
|
||
edge_type="schema_instance_of",
|
||
delta=0.1,
|
||
)
|
||
|
||
# Add singletons (single-record patterns) that should be left alone.
|
||
for s_idx in range(extra_singletons):
|
||
pattern = f"singleton-p{s_idx}"
|
||
patterns.append(pattern)
|
||
sch = _rec(
|
||
text=f"singleton-{s_idx}",
|
||
tier="semantic",
|
||
tags=[f"pattern:{pattern}", "schema"],
|
||
created_at=base + timedelta(seconds=10000 + s_idx),
|
||
)
|
||
store.insert(sch)
|
||
|
||
return store, patterns
|
||
|
||
|
||
def _count_semantic_pattern_records(store, pattern_tag_prefix: str = "pattern:") -> int:
|
||
return sum(
|
||
1
|
||
for r in store.all_records()
|
||
if r.tier == "semantic"
|
||
and any(t.startswith(pattern_tag_prefix) for t in (r.tags or []))
|
||
)
|
||
|
||
|
||
def _count_pruned(store) -> int:
|
||
return sum(1 for r in store.all_records() if r.tier == "semantic_pruned")
|
||
|
||
|
||
def test_cleanup_dry_run_does_not_mutate_store(tmp_path):
|
||
"""R8: --dry-run reports the diff and creates NO snapshot, mutates NO record."""
|
||
from iai_mcp.migrate import cleanup_schema_duplicates
|
||
|
||
store, _patterns = _seed_dup_store(
|
||
tmp_path, n_per_pattern=4, n_patterns=3
|
||
)
|
||
pre_semantic = _count_semantic_pattern_records(store)
|
||
pre_pruned = _count_pruned(store)
|
||
assert pre_semantic == 12 # 3 patterns x 4 dups each
|
||
assert pre_pruned == 0
|
||
|
||
# Sentinel: capture sibling listing of the IAI root before the run, so we
|
||
# can assert no `lancedb-pre-cleanup-*` directory was created.
|
||
siblings_pre = sorted(p.name for p in tmp_path.iterdir())
|
||
|
||
summary = cleanup_schema_duplicates(store, apply=False)
|
||
|
||
assert summary["mode"] == "dry-run"
|
||
assert summary["groups"] == 3
|
||
assert summary["keepers"] == 3
|
||
assert summary["pruned"] == 9 # 3 patterns x (4-1) dups each
|
||
assert summary["snapshot_dir"] is None
|
||
|
||
# Store unchanged.
|
||
assert _count_semantic_pattern_records(store) == 12
|
||
assert _count_pruned(store) == 0
|
||
|
||
# No snapshot directory created.
|
||
siblings_post = sorted(p.name for p in tmp_path.iterdir())
|
||
assert siblings_pre == siblings_post, (
|
||
f"--dry-run must not create any sibling directories; saw new entries: "
|
||
f"{set(siblings_post) - set(siblings_pre)}"
|
||
)
|
||
|
||
|
||
def test_cleanup_apply_creates_snapshot_directory_before_writes(tmp_path):
|
||
"""R8: --apply creates snapshot dir BEFORE soft-deletes; tables intact in copy.
|
||
|
||
Per D-11, snapshot is at `store.root / f'lancedb-pre-cleanup-{ts}'`
|
||
(sibling of the inner `lancedb/` tables dir).
|
||
"""
|
||
from iai_mcp.migrate import cleanup_schema_duplicates
|
||
|
||
store, _patterns = _seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=3)
|
||
summary = cleanup_schema_duplicates(store, apply=True)
|
||
|
||
assert summary["mode"] == "apply"
|
||
assert summary["snapshot_dir"] is not None
|
||
|
||
snap = Path(summary["snapshot_dir"])
|
||
assert snap.exists() and snap.is_dir()
|
||
# naming: sibling of the `lancedb/` tables dir, prefixed `lancedb-pre-cleanup-`.
|
||
assert snap.parent == Path(store.root)
|
||
assert snap.name.startswith("lancedb-pre-cleanup-")
|
||
# UTC ISO-8601 basic format suffix: YYYYMMDDTHHMMSSZ (16 chars).
|
||
suffix = snap.name[len("lancedb-pre-cleanup-"):]
|
||
assert len(suffix) == 16 and suffix.endswith("Z")
|
||
|
||
# The snapshot is a copy of the inner `lancedb/` tables dir, so it must
|
||
# contain the .lance subdirs at top level.
|
||
snap_entries = {p.name for p in snap.iterdir()}
|
||
assert "records.lance" in snap_entries, snap_entries
|
||
assert "events.lance" in snap_entries, snap_entries
|
||
assert "edges.lance" in snap_entries, snap_entries
|
||
|
||
|
||
def test_cleanup_apply_soft_deletes_duplicates_via_tier_rename(tmp_path):
|
||
"""R8: --apply leaves 1 keeper per pattern at tier='semantic'; the rest at 'semantic_pruned'."""
|
||
from iai_mcp.migrate import cleanup_schema_duplicates
|
||
|
||
store, _patterns = _seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=3)
|
||
cleanup_schema_duplicates(store, apply=True)
|
||
|
||
# 1 keeper per pattern × 3 patterns.
|
||
assert _count_semantic_pattern_records(store) == 3
|
||
# 3 dups per pattern × 3 patterns = 9 pruned.
|
||
assert _count_pruned(store) == 9
|
||
|
||
|
||
def test_cleanup_apply_reinforces_edges_onto_keeper(tmp_path):
|
||
"""R8: keeper inherits incoming schema_instance_of edges from duplicates."""
|
||
from iai_mcp.migrate import cleanup_schema_duplicates
|
||
from iai_mcp.store import EDGES_TABLE
|
||
|
||
store, patterns = _seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=3)
|
||
|
||
# Pre-state: each schema row has 1 incident schema_instance_of edge.
|
||
edges_pre = store.db.open_table(EDGES_TABLE).to_pandas()
|
||
pre_total_sio = int(
|
||
(edges_pre["edge_type"] == "schema_instance_of").sum()
|
||
)
|
||
assert pre_total_sio == 12 # 3 patterns x 4 schema rows x 1 edge each
|
||
|
||
# Determine keepers (oldest record per pattern) BEFORE the cleanup runs so
|
||
# we can locate them after.
|
||
pattern_to_keeper_id = {}
|
||
for p in patterns:
|
||
recs = sorted(
|
||
(
|
||
r
|
||
for r in store.all_records()
|
||
if r.tier == "semantic"
|
||
and f"pattern:{p}" in (r.tags or [])
|
||
),
|
||
key=lambda r: r.created_at,
|
||
)
|
||
if recs:
|
||
pattern_to_keeper_id[p] = recs[0].id
|
||
|
||
summary = cleanup_schema_duplicates(store, apply=True)
|
||
assert summary["edges_reinforced"] >= 9 # at least one per duplicate
|
||
|
||
# For each pattern, keeper's incident schema_instance_of edge count
|
||
# should equal the original cumulative count (4 per pattern).
|
||
edges_post = store.db.open_table(EDGES_TABLE).to_pandas()
|
||
for pattern, keeper_id in pattern_to_keeper_id.items():
|
||
keeper_str = str(keeper_id)
|
||
sio = edges_post[
|
||
(edges_post["edge_type"] == "schema_instance_of")
|
||
& ((edges_post["dst"] == keeper_str) | (edges_post["src"] == keeper_str))
|
||
]
|
||
# 4 schema rows × 1 edge each = 4 inbound edges should now point to
|
||
# the keeper (1 original + 3 redirected).
|
||
assert len(sio) == 4, (
|
||
f"pattern {pattern!r}: keeper {keeper_str[:8]} should have 4 "
|
||
f"schema_instance_of edges (1 original + 3 redirected from dups), "
|
||
f"got {len(sio)}"
|
||
)
|
||
|
||
|
||
def test_cleanup_apply_keeper_is_oldest_per_pattern(tmp_path):
|
||
"""keeper selection preserves provenance ordering — oldest record wins."""
|
||
from iai_mcp.migrate import cleanup_schema_duplicates
|
||
|
||
store, patterns = _seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=3)
|
||
|
||
# Identify expected keepers (oldest per pattern) BEFORE cleanup.
|
||
expected_keeper_ids = {}
|
||
for p in patterns:
|
||
recs = sorted(
|
||
(
|
||
r
|
||
for r in store.all_records()
|
||
if r.tier == "semantic"
|
||
and f"pattern:{p}" in (r.tags or [])
|
||
),
|
||
key=lambda r: r.created_at,
|
||
)
|
||
expected_keeper_ids[p] = recs[0].id
|
||
|
||
cleanup_schema_duplicates(store, apply=True)
|
||
|
||
# After cleanup: per pattern, exactly one tier='semantic' record remains
|
||
# AND it must be the oldest (the expected keeper id).
|
||
for p, expected_id in expected_keeper_ids.items():
|
||
survivors = [
|
||
r
|
||
for r in store.all_records()
|
||
if r.tier == "semantic"
|
||
and f"pattern:{p}" in (r.tags or [])
|
||
]
|
||
assert len(survivors) == 1, (
|
||
f"pattern {p!r}: expected exactly 1 keeper, got {len(survivors)}"
|
||
)
|
||
assert survivors[0].id == expected_id, (
|
||
f"pattern {p!r}: keeper should be the oldest record "
|
||
f"({str(expected_id)[:8]}), got {str(survivors[0].id)[:8]}"
|
||
)
|
||
|
||
|
||
def test_cleanup_apply_skips_single_record_groups(tmp_path):
|
||
"""R8: patterns with N=1 schema row are left untouched (not duplicates)."""
|
||
from iai_mcp.migrate import cleanup_schema_duplicates
|
||
|
||
store, _patterns = _seed_dup_store(
|
||
tmp_path, n_per_pattern=4, n_patterns=2, extra_singletons=2
|
||
)
|
||
# 2 patterns × 4 dups + 2 singletons = 10 semantic+pattern rows.
|
||
assert _count_semantic_pattern_records(store) == 10
|
||
|
||
summary = cleanup_schema_duplicates(store, apply=True)
|
||
assert summary["groups"] == 2 # only the 2 dup groups, not the singletons
|
||
assert summary["keepers"] == 2
|
||
assert summary["pruned"] == 6 # 2 patterns × 3 dups
|
||
|
||
# 2 singletons + 2 keepers = 4 semantic+pattern rows after cleanup.
|
||
assert _count_semantic_pattern_records(store) == 4
|
||
|
||
|
||
def test_cleanup_emits_schema_cleanup_run_event(tmp_path):
|
||
"""R8 + audit trail: schema_cleanup_run event written with the summary payload."""
|
||
from iai_mcp.events import query_events
|
||
from iai_mcp.migrate import cleanup_schema_duplicates
|
||
|
||
store, _patterns = _seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=3)
|
||
cleanup_schema_duplicates(store, apply=True)
|
||
|
||
events = query_events(store, kind="schema_cleanup_run")
|
||
assert len(events) >= 1, "schema_cleanup_run event must be emitted"
|
||
e = events[0]
|
||
payload = e["data"]
|
||
for required_key in (
|
||
"mode",
|
||
"groups",
|
||
"keepers",
|
||
"pruned",
|
||
"edges_reinforced",
|
||
"snapshot_dir",
|
||
):
|
||
assert required_key in payload, (
|
||
f"schema_cleanup_run event payload missing '{required_key}'"
|
||
)
|
||
assert payload["mode"] == "apply"
|
||
assert payload["groups"] == 3
|
||
assert payload["keepers"] == 3
|
||
assert payload["pruned"] == 9
|
||
|
||
|
||
def test_cleanup_apply_is_idempotent_on_second_run(tmp_path):
|
||
"""R8: re-running --apply on the migrated store reports zero work to do."""
|
||
from iai_mcp.migrate import cleanup_schema_duplicates
|
||
|
||
store, _patterns = _seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=3)
|
||
|
||
# First pass: real work.
|
||
summary1 = cleanup_schema_duplicates(store, apply=True)
|
||
assert summary1["groups"] == 3
|
||
assert summary1["pruned"] == 9
|
||
|
||
# Second pass: store is already clean — pruned rows are at
|
||
# tier='semantic_pruned' (not 'semantic'), so the dedup pass sees only
|
||
# the 3 surviving keepers (one per pattern, N=1 each, no dups).
|
||
summary2 = cleanup_schema_duplicates(store, apply=True)
|
||
assert summary2["groups"] == 0, (
|
||
f"second --apply must report 0 groups (idempotent), got {summary2}"
|
||
)
|
||
assert summary2["keepers"] == 0
|
||
assert summary2["pruned"] == 0
|
||
# Final post-state unchanged from the first pass.
|
||
assert _count_semantic_pattern_records(store) == 3
|
||
assert _count_pruned(store) == 9
|
||
|
||
|
||
# ---------------------------------------------------------------- Task 3: iai-mcp schema-cleanup CLI subcommand
|
||
|
||
|
||
def _run_cli(argv: list[str]) -> tuple[int, str]:
|
||
"""Invoke iai_mcp.cli.main(argv) under stdout capture; return (exit_code, output)."""
|
||
import io
|
||
from contextlib import redirect_stdout
|
||
|
||
from iai_mcp.cli import main
|
||
|
||
buf = io.StringIO()
|
||
with redirect_stdout(buf):
|
||
try:
|
||
code = main(argv)
|
||
except SystemExit as exc:
|
||
# argparse exits via SystemExit — propagate the code.
|
||
code = int(exc.code) if exc.code is not None else 0
|
||
return code, buf.getvalue()
|
||
|
||
|
||
def test_cli_schema_cleanup_default_is_dry_run(tmp_path):
|
||
"""default mode is dry-run (Beer VSM S2 reversibility)."""
|
||
_seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=3)
|
||
code, out = _run_cli(
|
||
["schema-cleanup", "--store-path", str(tmp_path)]
|
||
)
|
||
assert code == 0, f"CLI exited non-zero: {code!r}; output:\n{out}"
|
||
assert "[dry-run]" in out, (
|
||
f"default mode must report '[dry-run]' header; got:\n{out}"
|
||
)
|
||
# Reasonable summary output (counts visible).
|
||
assert "groups" in out
|
||
assert "keepers" in out
|
||
assert "pruned" in out
|
||
|
||
|
||
def test_cli_schema_cleanup_apply_runs_end_to_end(tmp_path):
|
||
"""--apply performs the cleanup end-to-end and prints the snapshot dir."""
|
||
from iai_mcp.store import MemoryStore
|
||
|
||
_seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=3)
|
||
code, out = _run_cli(
|
||
["schema-cleanup", "--apply", "--store-path", str(tmp_path)]
|
||
)
|
||
assert code == 0, f"CLI exited non-zero: {code!r}; output:\n{out}"
|
||
assert "[apply]" in out
|
||
assert "snapshot" in out.lower()
|
||
|
||
# Verify the store actually mutated — re-open and count.
|
||
store = MemoryStore(path=tmp_path)
|
||
assert _count_semantic_pattern_records(store) == 3
|
||
assert _count_pruned(store) == 9
|
||
|
||
|
||
def test_cli_schema_cleanup_dry_run_and_apply_mutually_exclusive(tmp_path):
|
||
"""argparse mutually-exclusive group rejects --dry-run --apply combo."""
|
||
_seed_dup_store(tmp_path, n_per_pattern=4, n_patterns=2)
|
||
code, _out = _run_cli(
|
||
[
|
||
"schema-cleanup",
|
||
"--dry-run",
|
||
"--apply",
|
||
"--store-path",
|
||
str(tmp_path),
|
||
]
|
||
)
|
||
assert code != 0, (
|
||
"--dry-run and --apply must be mutually exclusive (argparse-rejected)"
|
||
)
|
||
|
||
|
||
def test_cli_schema_cleanup_honours_store_path_argument(tmp_path):
|
||
"""--store-path targets a synthetic store so the prod store is never touched."""
|
||
# Two stores under the same temp tree: store_a has dups, store_b is empty.
|
||
store_a_root = tmp_path / "a"
|
||
store_b_root = tmp_path / "b"
|
||
store_a_root.mkdir()
|
||
store_b_root.mkdir()
|
||
|
||
_seed_dup_store(store_a_root, n_per_pattern=4, n_patterns=2)
|
||
|
||
# Cleanup against store_b should report 0 groups (empty store).
|
||
code, out_b = _run_cli(
|
||
["schema-cleanup", "--store-path", str(store_b_root)]
|
||
)
|
||
assert code == 0
|
||
assert "0" in out_b # at least one count of 0 in the output
|
||
|
||
# store_a still untouched (the b-cleanup hit a different path).
|
||
from iai_mcp.store import MemoryStore
|
||
|
||
store_a = MemoryStore(path=store_a_root)
|
||
assert _count_semantic_pattern_records(store_a) == 8 # 2 patterns x 4 dups
|
||
|
||
|
||
def test_cli_schema_cleanup_argparse_contract():
|
||
"""argparse: schema-cleanup has --apply (default False) + --store-path."""
|
||
from iai_mcp.cli import _build_parser
|
||
|
||
p = _build_parser()
|
||
ns = p.parse_args(["schema-cleanup", "--apply"])
|
||
assert ns.cmd == "schema-cleanup"
|
||
assert ns.apply is True
|
||
assert ns.dry_run is False
|
||
# Default --store-path is None (cmd_schema_cleanup falls back to ~/.iai-mcp).
|
||
assert ns.store_path is None
|
||
|
||
ns2 = p.parse_args(["schema-cleanup", "--dry-run"])
|
||
assert ns2.dry_run is True
|
||
assert ns2.apply is False
|
||
|
||
ns3 = p.parse_args(["schema-cleanup", "--store-path", "/tmp/foo"])
|
||
assert ns3.store_path == "/tmp/foo"
|
||
# When neither --dry-run nor --apply is given, both flags default False;
|
||
# cmd_schema_cleanup interprets this as the dry-run default.
|
||
assert ns3.apply is False
|
||
assert ns3.dry_run is False
|