From 7eea3ced28c4d30eb4215859be8b6b93d118d560 Mon Sep 17 00:00:00 2001 From: Apunkt Date: Fri, 15 May 2026 16:59:02 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20=5Fdecay=5Fedges=20=E2=80=94=20handle=20?= =?UTF-8?q?string=20updated=5Fat=20from=20Qdrant=20and=20add=20TableShim?= =?UTF-8?q?=20delete/update?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Qdrant _TableShim returned updated_at as ISO strings (not datetime objects), causing str.replace(tzinfo=...) to fail with "str.replace() takes no keyword arguments". Parse strings via datetime.fromisoformat() before timezone handling. Also add missing delete(where) and update(where=, values=) methods to _TableShim so the LanceDB-compatible shim works with Qdrant's payload collection, plus a _parse_where() helper to convert where clauses to Qdrant Filters. --- src/iai_mcp/qdrant_store.py | 130 ++++++++++++++++++++++++++++++++++++ src/iai_mcp/sleep.py | 11 +-- 2 files changed, 137 insertions(+), 4 deletions(-) diff --git a/src/iai_mcp/qdrant_store.py b/src/iai_mcp/qdrant_store.py index 8cb6914..4900a4d 100644 --- a/src/iai_mcp/qdrant_store.py +++ b/src/iai_mcp/qdrant_store.py @@ -1363,6 +1363,136 @@ class QdrantStore: return pd.DataFrame() return pd.DataFrame() + def delete(self, where: str) -> None: + """Delete rows from the table matching the where clause. + + LanceDB-compatible shim: ``table.delete("src = 'x' AND dst = 'y'")``. + """ + if self._name != EDGES_TABLE: + return + try: + cond = self._parse_where(where) + scroll_filter = Filter(must=[ + FieldCondition(key="table", match=MatchValue(value=self._name)), + ]) + if cond: + scroll_filter.must.append(cond) + point_ids = [] + offset = None + while True: + points, next_offset = self._store._client.scroll( + collection_name=METADATA_TABLE, + limit=1000, + offset=offset, + scroll_filter=scroll_filter, + with_payload=False, + with_vectors=False, + ) + for pt in points: + point_ids.append(pt.id) + if next_offset is None: + break + offset = next_offset + if point_ids: + self._store._client.delete( + collection_name=METADATA_TABLE, + points_selector=models.PointIdsList( + points=point_ids, + ), + ) + except Exception: + pass + + def update( + self, + where: str | None = None, + values: dict | None = None, + ) -> None: + """Update rows in the table matching the where clause. + + LanceDB-compatible shim: + ``table.update(where="src = 'x'", values={"weight": 0.5})``. + """ + if self._name != EDGES_TABLE: + return + if not values: + return + try: + cond = self._parse_where(where) if where else None + scroll_filter = Filter(must=[ + FieldCondition(key="table", match=MatchValue(value=self._name)), + ]) + if cond: + scroll_filter.must.append(cond) + # Build updated payload from values dict. + updated_payload: dict = {} + for k, v in values.items(): + if k == "updated_at" and isinstance(v, datetime): + updated_payload[k] = v.isoformat() + else: + updated_payload[k] = v + # Scroll matching points and upsert with updated payload. + point_ids = [] + offset = None + while True: + points, next_offset = self._store._client.scroll( + collection_name=METADATA_TABLE, + limit=1000, + offset=offset, + scroll_filter=scroll_filter, + with_payload=True, + with_vectors=False, + ) + for pt in points: + point_ids.append(pt.id) + if next_offset is None: + break + offset = next_offset + if point_ids: + for pid in point_ids: + existing = self._store._client.retrieve( + collection_name=METADATA_TABLE, + ids=[pid], + with_payload=True, + with_vectors=False, + ) + if existing: + base_payload = existing[0].payload or {} + base_payload.update(updated_payload) + self._store._client.upsert( + collection_name=METADATA_TABLE, + points=[PointStruct(id=pid, vector={}, payload=base_payload)], + ) + except Exception: + pass + + @staticmethod + def _parse_where(where: str | None) -> Filter | None: + """Parse a LanceDB-style where clause into a Qdrant Filter. + + Supported format: ``"key = 'value' AND key2 = 'value2'"``. + Each ``key = 'value'`` segment becomes a ``FieldCondition`` + combined with ``must`` (AND semantics). + + Returns None if the clause is empty or cannot be parsed. + """ + if not where: + return None + # Find all key = 'value' or key = "value" patterns. + pairs = re.findall( + r"""([a-z_]+)\s*=\s*['"]([^'"]+)['"]""", + where, + ) + if not pairs: + return None + conditions = [ + FieldCondition(key=key, match=MatchValue(value=val)) + for key, val in pairs + ] + if len(conditions) == 1: + return Filter(must=conditions) + return Filter(must=conditions) + @property def db(self) -> "QdrantStore._DbShim": """LanceDB-compatible shim for store.db.open_table().""" diff --git a/src/iai_mcp/sleep.py b/src/iai_mcp/sleep.py index ca844f2..e3e6c5b 100644 --- a/src/iai_mcp/sleep.py +++ b/src/iai_mcp/sleep.py @@ -185,10 +185,13 @@ def _decay_edges( last = row["updated_at"] if last is None: continue - # Coerce naive -> UTC; pandas may drop tz on some backends. - try: - py = last.to_pydatetime() if hasattr(last, "to_pydatetime") else last - except Exception: + # Coerce to datetime: pandas Timestamp -> pydatetime, ISO string -> datetime, + # or already a datetime object. + if hasattr(last, "to_pydatetime"): + py = last.to_pydatetime() + elif isinstance(last, str): + py = datetime.fromisoformat(last) + else: py = last if getattr(py, "tzinfo", None) is None: py = py.replace(tzinfo=timezone.utc)