fix: _decay_edges — handle string updated_at from Qdrant and add TableShim delete/update
The Qdrant _TableShim returned updated_at as ISO strings (not datetime objects), causing str.replace(tzinfo=...) to fail with "str.replace() takes no keyword arguments". Parse strings via datetime.fromisoformat() before timezone handling. Also add missing delete(where) and update(where=, values=) methods to _TableShim so the LanceDB-compatible shim works with Qdrant's payload collection, plus a _parse_where() helper to convert where clauses to Qdrant Filters.
This commit is contained in:
parent
6bd479b137
commit
7eea3ced28
2 changed files with 137 additions and 4 deletions
|
|
@ -1363,6 +1363,136 @@ class QdrantStore:
|
|||
return pd.DataFrame()
|
||||
return pd.DataFrame()
|
||||
|
||||
def delete(self, where: str) -> None:
|
||||
"""Delete rows from the table matching the where clause.
|
||||
|
||||
LanceDB-compatible shim: ``table.delete("src = 'x' AND dst = 'y'")``.
|
||||
"""
|
||||
if self._name != EDGES_TABLE:
|
||||
return
|
||||
try:
|
||||
cond = self._parse_where(where)
|
||||
scroll_filter = Filter(must=[
|
||||
FieldCondition(key="table", match=MatchValue(value=self._name)),
|
||||
])
|
||||
if cond:
|
||||
scroll_filter.must.append(cond)
|
||||
point_ids = []
|
||||
offset = None
|
||||
while True:
|
||||
points, next_offset = self._store._client.scroll(
|
||||
collection_name=METADATA_TABLE,
|
||||
limit=1000,
|
||||
offset=offset,
|
||||
scroll_filter=scroll_filter,
|
||||
with_payload=False,
|
||||
with_vectors=False,
|
||||
)
|
||||
for pt in points:
|
||||
point_ids.append(pt.id)
|
||||
if next_offset is None:
|
||||
break
|
||||
offset = next_offset
|
||||
if point_ids:
|
||||
self._store._client.delete(
|
||||
collection_name=METADATA_TABLE,
|
||||
points_selector=models.PointIdsList(
|
||||
points=point_ids,
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def update(
|
||||
self,
|
||||
where: str | None = None,
|
||||
values: dict | None = None,
|
||||
) -> None:
|
||||
"""Update rows in the table matching the where clause.
|
||||
|
||||
LanceDB-compatible shim:
|
||||
``table.update(where="src = 'x'", values={"weight": 0.5})``.
|
||||
"""
|
||||
if self._name != EDGES_TABLE:
|
||||
return
|
||||
if not values:
|
||||
return
|
||||
try:
|
||||
cond = self._parse_where(where) if where else None
|
||||
scroll_filter = Filter(must=[
|
||||
FieldCondition(key="table", match=MatchValue(value=self._name)),
|
||||
])
|
||||
if cond:
|
||||
scroll_filter.must.append(cond)
|
||||
# Build updated payload from values dict.
|
||||
updated_payload: dict = {}
|
||||
for k, v in values.items():
|
||||
if k == "updated_at" and isinstance(v, datetime):
|
||||
updated_payload[k] = v.isoformat()
|
||||
else:
|
||||
updated_payload[k] = v
|
||||
# Scroll matching points and upsert with updated payload.
|
||||
point_ids = []
|
||||
offset = None
|
||||
while True:
|
||||
points, next_offset = self._store._client.scroll(
|
||||
collection_name=METADATA_TABLE,
|
||||
limit=1000,
|
||||
offset=offset,
|
||||
scroll_filter=scroll_filter,
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
)
|
||||
for pt in points:
|
||||
point_ids.append(pt.id)
|
||||
if next_offset is None:
|
||||
break
|
||||
offset = next_offset
|
||||
if point_ids:
|
||||
for pid in point_ids:
|
||||
existing = self._store._client.retrieve(
|
||||
collection_name=METADATA_TABLE,
|
||||
ids=[pid],
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
)
|
||||
if existing:
|
||||
base_payload = existing[0].payload or {}
|
||||
base_payload.update(updated_payload)
|
||||
self._store._client.upsert(
|
||||
collection_name=METADATA_TABLE,
|
||||
points=[PointStruct(id=pid, vector={}, payload=base_payload)],
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _parse_where(where: str | None) -> Filter | None:
|
||||
"""Parse a LanceDB-style where clause into a Qdrant Filter.
|
||||
|
||||
Supported format: ``"key = 'value' AND key2 = 'value2'"``.
|
||||
Each ``key = 'value'`` segment becomes a ``FieldCondition``
|
||||
combined with ``must`` (AND semantics).
|
||||
|
||||
Returns None if the clause is empty or cannot be parsed.
|
||||
"""
|
||||
if not where:
|
||||
return None
|
||||
# Find all key = 'value' or key = "value" patterns.
|
||||
pairs = re.findall(
|
||||
r"""([a-z_]+)\s*=\s*['"]([^'"]+)['"]""",
|
||||
where,
|
||||
)
|
||||
if not pairs:
|
||||
return None
|
||||
conditions = [
|
||||
FieldCondition(key=key, match=MatchValue(value=val))
|
||||
for key, val in pairs
|
||||
]
|
||||
if len(conditions) == 1:
|
||||
return Filter(must=conditions)
|
||||
return Filter(must=conditions)
|
||||
|
||||
@property
|
||||
def db(self) -> "QdrantStore._DbShim":
|
||||
"""LanceDB-compatible shim for store.db.open_table()."""
|
||||
|
|
|
|||
|
|
@ -185,10 +185,13 @@ def _decay_edges(
|
|||
last = row["updated_at"]
|
||||
if last is None:
|
||||
continue
|
||||
# Coerce naive -> UTC; pandas may drop tz on some backends.
|
||||
try:
|
||||
py = last.to_pydatetime() if hasattr(last, "to_pydatetime") else last
|
||||
except Exception:
|
||||
# Coerce to datetime: pandas Timestamp -> pydatetime, ISO string -> datetime,
|
||||
# or already a datetime object.
|
||||
if hasattr(last, "to_pydatetime"):
|
||||
py = last.to_pydatetime()
|
||||
elif isinstance(last, str):
|
||||
py = datetime.fromisoformat(last)
|
||||
else:
|
||||
py = last
|
||||
if getattr(py, "tzinfo", None) is None:
|
||||
py = py.replace(tzinfo=timezone.utc)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue