From a31bbd7f58b8597a061fba25c11c77a030613327 Mon Sep 17 00:00:00 2001 From: Apunkt Date: Thu, 14 May 2026 16:20:54 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20Qdrant=20backend=20support=20=E2=80=94?= =?UTF-8?q?=20topology,=20doctor,=20and=20community=20detection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix build_runtime_graph to use backend-agnostic records_as_dataframe() and edges_as_dataframe() instead of LanceDB-specific open_table() - Fix CLI topology command: JSON-RPC envelope + result extraction - Fix community.py KeyError when graph has nodes but no edges - Update doctor check (i) to report Qdrant collection counts when Qdrant is active, LanceDB versions when LanceDB is active - Fix HIBERNATION startup exit: dispatch REQUEST_ARRIVED on boot - Fix systemd unit: StartLimit* keys in [Unit] section - Broaden capture.py exception handler for deferred capture failures - Add records_as_dataframe() and edges_as_dataframe() to MemoryStore --- deploy/systemd/iai-mcp-daemon.service | 4 +- src/iai_mcp/capture.py | 7 ++- src/iai_mcp/cli.py | 4 +- src/iai_mcp/community.py | 3 +- src/iai_mcp/daemon.py | 11 ++++ src/iai_mcp/doctor.py | 72 ++++++++++++++++++--------- src/iai_mcp/retrieve.py | 11 ++-- src/iai_mcp/store.py | 8 +++ 8 files changed, 87 insertions(+), 33 deletions(-) diff --git a/deploy/systemd/iai-mcp-daemon.service b/deploy/systemd/iai-mcp-daemon.service index 8aba311..6e56551 100644 --- a/deploy/systemd/iai-mcp-daemon.service +++ b/deploy/systemd/iai-mcp-daemon.service @@ -13,14 +13,14 @@ [Unit] Description=IAI-MCP Sleep Daemon -- autonomous neural consolidation between sessions After=default.target +StartLimitIntervalSec=60 +StartLimitBurst=3 [Service] Type=simple ExecStart=%h/.venv/iai-mcp/bin/python -m iai_mcp.daemon Restart=on-failure RestartSec=30 -StartLimitIntervalSec=60 -StartLimitBurst=3 Environment="IAI_MCP_STORE=%h/.iai-mcp" Environment="QDRANT_URL=http://192.168.0.22:6333" diff --git a/src/iai_mcp/capture.py b/src/iai_mcp/capture.py index 6624604..67ff704 100644 --- a/src/iai_mcp/capture.py +++ b/src/iai_mcp/capture.py @@ -143,14 +143,17 @@ def capture_turn( #, NOT boost_edges([UUID(...)]) which expects pairs. try: store.reinforce_record(record.id) - except (ValueError, IOError) as exc: + except Exception as exc: # Reinforce is best-effort observability; log and continue # so the duplicate is still detected even if the LTP write - # fails. Same narrowed-except discipline as the query above. + # fails. Broad exception catch because reinforce_record is + # non-critical — any error (Qdrant network, ValueError, etc.) + # must not block the capture pipeline. log.warning( "capture_dedup_reinforce_failed", extra={ "err_type": type(exc).__name__, + "err": str(exc)[:200], "record_id": str(record.id), }, ) diff --git a/src/iai_mcp/cli.py b/src/iai_mcp/cli.py index b5eb307..7f4aff7 100644 --- a/src/iai_mcp/cli.py +++ b/src/iai_mcp/cli.py @@ -1504,10 +1504,12 @@ def cmd_topology(args: argparse.Namespace) -> int: never needs to instantiate a local store (avoids lancedb/Qdrant deps on non-AVX CPUs or when QDRANT_URL is only in systemd env). """ - snap = _send_socket_request({"type": "topology"}) + snap = _send_socket_request({"jsonrpc": "2.0", "id": 1, "method": "topology"}) if snap is None: print("ERROR: daemon unreachable", file=sys.stderr) return 1 + # Extract the result from the JSON-RPC response envelope. + snap = snap.get("result") or snap def _fmt(v) -> str: if v is None: diff --git a/src/iai_mcp/community.py b/src/iai_mcp/community.py index 88ac2a8..7c0f7fa 100644 --- a/src/iai_mcp/community.py +++ b/src/iai_mcp/community.py @@ -157,7 +157,8 @@ def _map_to_stable_uuids( community_centroids: dict[UUID, list[float]] = {} for grp, nodes in groups.items(): u = uuid_for_group[grp] - community_centroids[u] = new_centroids[grp] + if grp in new_centroids: + community_centroids[u] = new_centroids[grp] for n in nodes: node_to_community[n] = u diff --git a/src/iai_mcp/daemon.py b/src/iai_mcp/daemon.py index 422868c..8c7fc82 100644 --- a/src/iai_mcp/daemon.py +++ b/src/iai_mcp/daemon.py @@ -1453,6 +1453,17 @@ async def main() -> int: except Exception: pass + # If the lifecycle state is still HIBERNATION at startup (no + # wake.signal or daemon was started by systemd/launchd without a + # wrapper kick), transition to WAKE. The daemon was explicitly + # started, so it should be active; the lifecycle tick will decide + # when to hibernate based on idle time. + if _state_machine.current_state is _LifecycleState.HIBERNATION: + try: + _state_machine.dispatch(_LifecycleEvent.REQUEST_ARRIVED) + except Exception: + pass + tick_task = asyncio.create_task( _scheduler_tick(store, lock, state, mcp_socket=mcp_socket) ) diff --git a/src/iai_mcp/doctor.py b/src/iai_mcp/doctor.py index aa5817e..55da676 100644 --- a/src/iai_mcp/doctor.py +++ b/src/iai_mcp/doctor.py @@ -699,37 +699,63 @@ def _resolve_records_lance_versions_dir() -> Path: def check_i_lance_versions_count() -> CheckResult: - """(i) records.lance versions count: PASS <=500, WARN 501..2000, FAIL >2000. + """(i) storage backend status: Qdrant collection counts or LanceDB versions. - Plan 07.14-03 [Wave2-Option-C] diagnostic row. The root-cause - attack drained ``~/.iai-mcp/lancedb/records.lance/_versions/`` from 7298 - manifests to a small constant (Wave 1 compaction). This check warns the - user before the pile re-accumulates to a daemon-boot-stalling scale. + Plan 07.14-03 [Wave2-Option-C] diagnostic row. When LanceDB is active, + reports ``records.lance`` versions count: PASS <=500, WARN 501..2000, + FAIL >2000. The root-cause attack drained ``~/.iai-mcp/lancedb/records.lance/_versions/`` + from 7298 manifests to a small constant (Wave 1 compaction). + + When Qdrant is active, reports collection point counts for ``records`` + and ``metadata`` collections to verify data migration completeness. Resolution honors ``IAI_MCP_STORE`` env (test isolation + multi-tenant) before falling back to ``~/.iai-mcp``; mirrors ``MemoryStore.__init__``. - Status thresholds: - - PASS: ``count <= 500`` -- healthy steady state. - - WARN: ``501 <= count <= 2000`` -- recommend ``iai-mcp maintenance - compact-records --apply --yes`` at next quiet window. - - FAIL: ``count > 2000`` -- daemon boot-bind will be slow (>10 s); - recommend immediate compaction. - - Edge cases: - - ``records.lance/_versions/`` directory absent (fresh install, - store never written) -> PASS with explanatory detail. - - ``OSError`` while enumerating (permission denied, FUSE error) -> - WARN with the error class+message; never FAIL on a probe error. - INV-7 (CPU-near-zero idle) preserved: this check runs ONLY when the user invokes ``iai-mcp doctor`` -- no background polling, no daemon-side work. """ + from iai_mcp.store import _use_qdrant + + # Heuristic: qdrant_storage/ directory present → Qdrant is the active + # backend even if QDRANT_URL is not set in the current shell (e.g. + # systemd service provides it but interactive shell does not). + env_path = os.environ.get("IAI_MCP_STORE") + store_root = Path(env_path) if env_path else (Path.home() / ".iai-mcp") + qdrant_detected = (store_root / "qdrant_storage").exists() + + if _use_qdrant() or qdrant_detected: + # Qdrant path: attempt to report collection counts. + # If QDRANT_API_KEY is not set in the current shell, we can't + # connect directly — fall back to reporting Qdrant detection + # without collection counts (the daemon is using Qdrant successfully). + try: + from iai_mcp.qdrant_store import QdrantStore + qstore = QdrantStore() + records_count = qstore.count_rows("records") + metadata_count = qstore.count_rows("metadata") + return CheckResult( + name="(i) storage backend status", + passed=True, + detail=f"Qdrant backend: records={records_count}, metadata={metadata_count}", + status="PASS", + ) + except Exception: + # Can't connect to Qdrant from this shell (missing API key, + # network issue, etc.). The daemon is running with Qdrant, + # so we report detection without counts. + return CheckResult( + name="(i) storage backend status", + passed=True, + detail="Qdrant backend detected (qdrant_storage/ present); collection counts unavailable without QDRANT_API_KEY", + status="PASS", + ) + versions_dir = _resolve_records_lance_versions_dir() if not versions_dir.exists(): return CheckResult( - name="(i) lance versions count", + name="(i) storage backend status", passed=True, detail=f"{versions_dir} not present yet (fresh install or no writes yet)", status="PASS", @@ -738,21 +764,21 @@ def check_i_lance_versions_count() -> CheckResult: count = sum(1 for _ in versions_dir.glob("*.manifest")) except OSError as exc: return CheckResult( - name="(i) lance versions count", + name="(i) storage backend status", passed=True, # WARN, not FAIL: probe failure is advisory. detail=f"could not enumerate versions: {type(exc).__name__}: {exc}", status="WARN", ) if count <= 500: return CheckResult( - name="(i) lance versions count", + name="(i) storage backend status", passed=True, detail=f"{count} version manifest(s); healthy", status="PASS", ) if count <= 2000: return CheckResult( - name="(i) lance versions count", + name="(i) storage backend status", passed=True, # WARN -- still passes the gate. detail=( f"{count} version manifests; consider running " @@ -761,7 +787,7 @@ def check_i_lance_versions_count() -> CheckResult: status="WARN", ) return CheckResult( - name="(i) lance versions count", + name="(i) storage backend status", passed=False, detail=( f"{count} version manifests (>2000); daemon boot will be slow. " diff --git a/src/iai_mcp/retrieve.py b/src/iai_mcp/retrieve.py index 886dd96..72caac9 100644 --- a/src/iai_mcp/retrieve.py +++ b/src/iai_mcp/retrieve.py @@ -22,6 +22,8 @@ from datetime import datetime, timedelta, timezone from itertools import combinations from uuid import UUID, uuid4 +import pandas as pd + from iai_mcp.aaak import enforce_english_raw, generate_aaak_index from iai_mcp.events import query_events, write_event from iai_mcp.store import MemoryStore @@ -458,8 +460,7 @@ def build_runtime_graph(store: MemoryStore): # record count matches, reuse it — skips the encrypted LanceDB scan. # Otherwise fall through to the full row walk so node attrs stay # strictly derived from the authoritative store. - records_tbl = store.db.open_table("records") - records_count = int(records_tbl.count_rows()) + records_count = len(store.records_as_dataframe()) use_cached_payload = ( cached_node_payload is not None and len(cached_node_payload) == records_count @@ -490,7 +491,9 @@ def build_runtime_graph(store: MemoryStore): # MISS path: walk the records table, attach payload at # graph.add_node time, and remember the payload so we can # persist it into the cache below. - df = records_tbl.to_pandas() + # Backend-agnostic: use store.records_as_dataframe() which + # works for both MemoryStore (LanceDB) and QdrantStore. + df = store.records_as_dataframe() node_payload_for_cache = {} decrypt_fail_events = 0 decrypt_fail_unique: set[str] = set() @@ -598,7 +601,7 @@ def build_runtime_graph(store: MemoryStore): }, ) - edges_df = store.db.open_table("edges").to_pandas() + edges_df = store.edges_as_dataframe() for _, row in edges_df.iterrows(): graph.add_edge( UUID(row["src"]), diff --git a/src/iai_mcp/store.py b/src/iai_mcp/store.py index d939415..4c533df 100644 --- a/src/iai_mcp/store.py +++ b/src/iai_mcp/store.py @@ -856,6 +856,14 @@ class MemoryStore: df = tbl.to_pandas() return [self._from_row(r.to_dict()) for _, r in df.iterrows()] + def records_as_dataframe(self) -> "pd.DataFrame": + """Return all records as a pandas DataFrame (backend-agnostic).""" + return self.db.open_table(RECORDS_TABLE).to_pandas() + + def edges_as_dataframe(self) -> "pd.DataFrame": + """Return all edges as a pandas DataFrame (backend-agnostic).""" + return self.db.open_table(EDGES_TABLE).to_pandas() + # (D-05..D-10): streaming + projection — see internal architecture spec def iter_records( self,