From 382b411133e2a8d3dcfebae432c51a753a86b0ec Mon Sep 17 00:00:00 2001 From: Apunkt Date: Sat, 16 May 2026 12:08:28 +0200 Subject: [PATCH] feat(maintenance): add dedup-edges subcommand to clean Qdrant duplicates Cleans up duplicate edge points left by the boost_edges() bug (fixed in previous commit). Each unique (src, dst, edge_type) keeps the highest-weight point; duplicates are deleted. Default --dry-run. --- src/iai_mcp/cli.py | 163 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/src/iai_mcp/cli.py b/src/iai_mcp/cli.py index 7f4aff7..768cdc5 100644 --- a/src/iai_mcp/cli.py +++ b/src/iai_mcp/cli.py @@ -2395,6 +2395,132 @@ def cmd_maintenance_sleep_cycle(args: argparse.Namespace) -> int: return 0 +def cmd_maintenance_dedup_edges(args: argparse.Namespace) -> int: + """Qdrant: remove duplicate edge points from the metadata collection. + + Each unique (src, dst, edge_type) should have exactly one point. + For duplicates, keeps the point with the highest weight (ties broken + by most recent updated_at). Deletes the rest. + + Default --dry-run prints metrics; --apply --yes actually deletes. + """ + # Resolve store path. + if args.store_path is not None: + store_path = Path(args.store_path).expanduser() + else: + store_path = Path.home() / ".iai-mcp" + + # We need Qdrant to access the metadata collection directly. + from iai_mcp.qdrant_store import QdrantStore, METADATA_TABLE, EDGES_TABLE + + try: + store = QdrantStore(path=str(store_path)) + except Exception as e: + print(f"error: could not connect to Qdrant: {e}", file=sys.stderr) + return 1 + + apply = bool(getattr(args, "apply", False)) + yes = bool(getattr(args, "yes", False)) + + # Scroll all edge points. + try: + all_edges = store._scroll_all(METADATA_TABLE, table_filter=EDGES_TABLE) + except Exception as e: + print(f"error: could not scroll edges: {e}", file=sys.stderr) + return 1 + + # Group by (src, dst, edge_type) -> list of (point_id, weight, updated_at). + groups: dict[tuple[str, str, str], list[tuple[str, float, str]]] = {} + for point in all_edges: + p = point.payload or {} + key = ( + p.get("src", ""), + p.get("dst", ""), + p.get("edge_type", ""), + ) + weight = float(p.get("weight", 0.0)) + updated_at = p.get("updated_at", "") + groups.setdefault(key, []).append((str(point.id), weight, updated_at)) + + total_points = len(all_edges) + unique_keys = len(groups) + duplicate_keys = {k: v for k, v in groups.items() if len(v) > 1} + points_to_delete = sum(len(v) - 1 for v in duplicate_keys.values()) + + if not duplicate_keys: + print(json.dumps({ + "total_edges": total_points, + "unique_edges": unique_keys, + "duplicates": 0, + "points_to_delete": 0, + }, indent=2)) + return 0 + + if not apply: + # Dry run: print metrics. + print(json.dumps({ + "total_edges": total_points, + "unique_edges": unique_keys, + "duplicate_groups": len(duplicate_keys), + "points_to_delete": points_to_delete, + "sample_duplicates": { + f"{k[0]}->{k[1]}[{k[2]}]": [ + {"id": pid, "weight": w, "updated_at": ua} + for pid, w, ua in v + ] + for k, v in list(duplicate_keys.items())[:5] + }, + }, indent=2)) + return 0 + + # --apply path: pre-flight consent. + if not yes and not sys.stdin.isatty(): + print( + "error: --apply without --yes on non-tty is refused", + file=sys.stderr, + ) + return 2 + + # Delete duplicate points (keep highest weight, tie-break by updated_at). + delete_ids: list[str] = [] + for key, points in duplicate_keys.items(): + # Sort by weight desc, then updated_at desc; keep first, delete rest. + points.sort(key=lambda x: (x[1], x[2]), reverse=True) + delete_ids.extend(pid for pid, _, _ in points[1:]) + + if not delete_ids: + print(json.dumps({ + "total_edges": total_points, + "unique_edges": unique_keys, + "duplicates": 0, + "points_deleted": 0, + }, indent=2)) + return 0 + + # Batch delete in chunks of 1000 (Qdrant API limit). + from qdrant_client import models + + deleted = 0 + for i in range(0, len(delete_ids), 1000): + chunk = delete_ids[i:i+1000] + try: + store._client.delete( + collection_name=METADATA_TABLE, + points_selector=models.PointIdsList(points=chunk), + ) + deleted += len(chunk) + except Exception as e: + print(f"warning: failed to delete chunk: {e}", file=sys.stderr) + + print(json.dumps({ + "total_edges": total_points, + "unique_edges": unique_keys, + "duplicate_groups": len(duplicate_keys), + "points_deleted": deleted, + }, indent=2)) + return 0 + + def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="iai-mcp") sub = parser.add_subparsers(dest="cmd", required=True) @@ -2833,6 +2959,43 @@ def _build_parser() -> argparse.ArgumentParser: ) mtn_sleep.set_defaults(func=cmd_maintenance_sleep_cycle) + # Plan QDRANT-DUP: maintenance dedup-edges subcommand. + # Removes duplicate edge points in the Qdrant metadata collection. + mtn_dedup = mtn_sub.add_parser( + "dedup-edges", + help=( + "(Qdrant) remove duplicate edge points. Each unique (src, dst, " + "edge_type) should have exactly one point; keeps the highest-weight " + "point, deletes the rest. Default --dry-run; --apply requires --yes." + ), + ) + mtn_dedup_mode = mtn_dedup.add_mutually_exclusive_group() + mtn_dedup_mode.add_argument( + "--dry-run", + action="store_true", + default=False, + help="(default) print dedup metrics; do NOT delete", + ) + mtn_dedup_mode.add_argument( + "--apply", + action="store_true", + default=False, + help="actually delete duplicate edge points", + ) + mtn_dedup.add_argument( + "--yes", "-y", + action="store_true", + default=False, + help="(use with --apply) skip the interactive 'y/N' prompt", + ) + mtn_dedup.add_argument( + "--store-path", + dest="store_path", + default=None, + help="IAI root directory (defaults to ~/.iai-mcp)", + ) + mtn_dedup.set_defaults(func=cmd_maintenance_dedup_edges) + # R9: doctor top-level subcommand (D7-10 — same placement # precedent as `iai-mcp schema-cleanup`, NOT nested under # `iai-mcp daemon`). First-touch recovery tool — top-level