mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-10 20:35:17 +02:00
feat(indexers): add mark_connector_documents_failed helper
This commit is contained in:
parent
dec5a28d65
commit
cb10882dc8
1 changed files with 56 additions and 0 deletions
|
|
@ -2,6 +2,7 @@
|
|||
Base functionality and shared imports for connector indexers.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
|
|
@ -10,6 +11,8 @@ from sqlalchemy.future import select
|
|||
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentStatus,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
)
|
||||
|
|
@ -130,6 +133,59 @@ async def check_document_by_unique_identifier(
|
|||
return existing_doc_result.scalars().first()
|
||||
|
||||
|
||||
async def mark_connector_documents_failed(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
document_type: DocumentType,
|
||||
search_space_id: int,
|
||||
failures: list[tuple[str, str]],
|
||||
) -> int:
|
||||
"""Transition placeholder/in-progress documents to ``failed`` by source id.
|
||||
|
||||
Without this, a document whose download/ETL fails stays stuck in
|
||||
``pending``/``processing`` forever: undeletable in the UI and never retried.
|
||||
|
||||
``failures`` is a list of ``(unique_id, reason)``. Best-effort: never raises,
|
||||
and leaves ``ready`` documents untouched. Returns the number marked failed.
|
||||
"""
|
||||
if not failures:
|
||||
return 0
|
||||
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
|
||||
marked = 0
|
||||
try:
|
||||
for unique_id, reason in failures:
|
||||
if not unique_id:
|
||||
continue
|
||||
uid_hash = compute_identifier_hash(
|
||||
document_type.value, unique_id, search_space_id
|
||||
)
|
||||
existing = await check_document_by_unique_identifier(session, uid_hash)
|
||||
if existing is None:
|
||||
continue
|
||||
if DocumentStatus.is_state(existing.status, DocumentStatus.READY):
|
||||
continue
|
||||
existing.status = DocumentStatus.failed(reason)
|
||||
existing.updated_at = datetime.now(UTC)
|
||||
marked += 1
|
||||
|
||||
if marked:
|
||||
await session.commit()
|
||||
except Exception:
|
||||
with contextlib.suppress(Exception):
|
||||
await session.rollback()
|
||||
logger.warning(
|
||||
"Failed to mark %d connector document(s) as failed (type=%s)",
|
||||
len(failures),
|
||||
getattr(document_type, "value", document_type),
|
||||
exc_info=True,
|
||||
)
|
||||
return 0
|
||||
|
||||
return marked
|
||||
|
||||
|
||||
async def get_connector_by_id(
|
||||
session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType
|
||||
) -> SearchSourceConnector | None:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue