Merge pull request #1475 from CREDO23/fix-bugs

[Fix(indexers)] Stop documents getting stuck in pending/processing on ETL or skip failures
This commit is contained in:
Rohan Verma 2026-06-09 21:51:28 -07:00 committed by GitHub
commit fb6a65cb69
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 581 additions and 70 deletions

View file

@ -1,3 +1,5 @@
import contextlib
import logging
from datetime import UTC, datetime
from sqlalchemy.ext.asyncio import AsyncSession
@ -6,6 +8,8 @@ from sqlalchemy.orm.attributes import set_committed_value
from app.db import Document, DocumentStatus
logger = logging.getLogger(__name__)
async def rollback_and_persist_failure(
session: AsyncSession, document: Document, message: str
@ -18,14 +22,28 @@ async def rollback_and_persist_failure(
try:
await session.rollback()
except Exception:
return # Session is completely dead; nothing further we can do.
# Session is completely dead; surface it but never raise.
logger.warning(
"Rollback failed; cannot persist failed status for document %s",
getattr(document, "id", "unknown"),
exc_info=True,
)
return
try:
await session.refresh(document)
document.updated_at = datetime.now(UTC)
document.status = DocumentStatus.failed(message)
await session.commit()
except Exception:
pass # Best-effort; document will be retried on the next sync.
# Best-effort: the document stays non-ready and is retried next sync.
# Log it so a permanently-stuck document is at least traceable.
logger.warning(
"Could not persist failed status for document %s; will retry next sync",
getattr(document, "id", "unknown"),
exc_info=True,
)
with contextlib.suppress(Exception):
await session.rollback()
def attach_chunks_to_document(document: Document, chunks: list) -> None:

View file

@ -6,6 +6,7 @@ Implements real-time document status updates using a two-phase approach:
- Phase 2: Process each document one by one (pending processing ready/failed)
"""
import contextlib
import time
from collections.abc import Awaitable, Callable
@ -432,10 +433,15 @@ async def index_airtable_records(
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
with contextlib.suppress(Exception):
await session.rollback()
documents_failed += 1
continue

View file

@ -2,6 +2,7 @@
Base functionality and shared imports for connector indexers.
"""
import contextlib
import logging
from datetime import UTC, datetime, timedelta
@ -10,6 +11,8 @@ from sqlalchemy.future import select
from app.db import (
Document,
DocumentStatus,
DocumentType,
SearchSourceConnector,
SearchSourceConnectorType,
)
@ -130,6 +133,59 @@ async def check_document_by_unique_identifier(
return existing_doc_result.scalars().first()
async def mark_connector_documents_failed(
session: AsyncSession,
*,
document_type: DocumentType,
search_space_id: int,
failures: list[tuple[str, str]],
) -> int:
"""Transition placeholder/in-progress documents to ``failed`` by source id.
Without this, a document whose download/ETL fails stays stuck in
``pending``/``processing`` forever: undeletable in the UI and never retried.
``failures`` is a list of ``(unique_id, reason)``. Best-effort: never raises,
and leaves ``ready`` documents untouched. Returns the number marked failed.
"""
if not failures:
return 0
from app.indexing_pipeline.document_hashing import compute_identifier_hash
marked = 0
try:
for unique_id, reason in failures:
if not unique_id:
continue
uid_hash = compute_identifier_hash(
document_type.value, unique_id, search_space_id
)
existing = await check_document_by_unique_identifier(session, uid_hash)
if existing is None:
continue
if DocumentStatus.is_state(existing.status, DocumentStatus.READY):
continue
existing.status = DocumentStatus.failed(reason)
existing.updated_at = datetime.now(UTC)
marked += 1
if marked:
await session.commit()
except Exception:
with contextlib.suppress(Exception):
await session.rollback()
logger.warning(
"Failed to mark %d connector document(s) as failed (type=%s)",
len(failures),
getattr(document_type, "value", document_type),
exc_info=True,
)
return 0
return marked
async def get_connector_by_id(
session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType
) -> SearchSourceConnector | None:

View file

@ -6,6 +6,7 @@ Implements 2-phase document status updates for real-time UI feedback:
- Phase 2: Process each page: pending processing ready/failed
"""
import contextlib
import time
from collections.abc import Awaitable, Callable
from datetime import datetime
@ -432,10 +433,15 @@ async def index_bookstack_pages(
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
with contextlib.suppress(Exception):
await session.rollback()
skipped_pages.append(
f"{item.get('page_name', 'Unknown')} (processing error)"
)

View file

@ -437,10 +437,15 @@ async def index_clickup_tasks(
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
with contextlib.suppress(Exception):
await session.rollback()
documents_failed += 1
continue

View file

@ -21,6 +21,7 @@ from .base import (
check_duplicate_document_by_hash,
get_connector_by_id,
logger,
mark_connector_documents_failed,
update_connector_last_indexed,
)
@ -295,6 +296,23 @@ async def index_confluence_pages(
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
# Placeholders for items skipped above (empty/duplicate/unbuildable) would
# otherwise stay stuck in 'pending' and undeletable. Fail them so they're
# recoverable. Leaves already-ready docs untouched.
indexed_ids = {doc.unique_id for doc in connector_docs}
stuck_placeholders = [
(p.unique_id, "Skipped during sync: no indexable content")
for p in placeholders
if p.unique_id and p.unique_id not in indexed_ids
]
if stuck_placeholders:
await mark_connector_documents_failed(
session,
document_type=DocumentType.CONFLUENCE_CONNECTOR,
search_space_id=search_space_id,
failures=stuck_placeholders,
)
await update_connector_last_indexed(session, connector, update_last_indexed)
logger.info(

View file

@ -10,6 +10,7 @@ Uses 2-phase document status updates for real-time UI feedback:
"""
import asyncio
import contextlib
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime, timedelta
@ -713,10 +714,15 @@ async def index_discord_messages(
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
with contextlib.suppress(Exception):
await session.rollback()
documents_failed += 1
continue

View file

@ -26,12 +26,14 @@ from app.connectors.dropbox.file_types import should_skip_file as skip_item
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.exceptions import safe_exception_message
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
get_connector_by_id,
mark_connector_documents_failed,
update_connector_last_indexed,
)
@ -112,7 +114,12 @@ async def _should_skip_file(
logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'")
return True, f"File renamed: '{old_name}' -> '{file_name}'"
if not DocumentStatus.is_state(existing.status, DocumentStatus.READY):
state = DocumentStatus.get_state(existing.status)
if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING):
# Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index
# instead of skipping, otherwise it never recovers.
return False, None
if state != DocumentStatus.READY:
return True, "skipped (previously failed)"
return True, "unchanged"
@ -158,15 +165,20 @@ async def _download_files_parallel(
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel. Returns (docs, failed_count)."""
) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]:
"""Download and ETL files in parallel.
Returns (docs, failed_files), where failed_files is a list of
(file_id, reason) so callers can mark those placeholders failed.
"""
results: list[ConnectorDocument] = []
sem = asyncio.Semaphore(max_concurrency)
last_heartbeat = time.time()
completed_count = 0
hb_lock = asyncio.Lock()
async def _download_one(file: dict) -> ConnectorDocument | None:
async def _download_one(file: dict) -> ConnectorDocument | str:
# ConnectorDocument on success; failure reason string otherwise.
nonlocal last_heartbeat, completed_count
async with sem:
markdown, db_metadata, error = await download_and_extract_content(
@ -176,7 +188,7 @@ async def _download_files_parallel(
file_name = file.get("name", "Unknown")
reason = error or "empty content"
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
return None
return f"Download/ETL failed: {reason}"
doc = _build_connector_doc(
file,
markdown,
@ -197,14 +209,28 @@ async def _download_files_parallel(
tasks = [_download_one(f) for f in files]
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
failed = 0
for outcome in outcomes:
if isinstance(outcome, Exception) or outcome is None:
failed += 1
else:
failed_files: list[tuple[str, str]] = []
for file, outcome in zip(files, outcomes, strict=False):
if isinstance(outcome, ConnectorDocument):
results.append(outcome)
continue
file_id = file.get("id")
if isinstance(outcome, Exception):
reason = f"Download/ETL error: {safe_exception_message(outcome)}"
logger.warning(
"Download/ETL exception for %s: %s",
file.get("name", "Unknown"),
outcome,
exc_info=outcome,
)
elif isinstance(outcome, str):
reason = outcome
else:
reason = "Download or extraction failed"
if file_id:
failed_files.append((file_id, reason))
return results, failed
return results, failed_files
async def _download_and_index(
@ -219,7 +245,7 @@ async def _download_and_index(
vision_llm=None,
) -> tuple[int, int]:
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
connector_docs, download_failed = await _download_files_parallel(
connector_docs, failed_files = await _download_files_parallel(
dropbox_client,
files,
connector_id=connector_id,
@ -229,6 +255,15 @@ async def _download_and_index(
vision_llm=vision_llm,
)
# Fail rows for files whose download/ETL failed, so they don't stay stuck.
if failed_files:
await mark_connector_documents_failed(
session,
document_type=DocumentType.DROPBOX_FILE,
search_space_id=search_space_id,
failures=failed_files,
)
batch_indexed = 0
batch_failed = 0
if connector_docs:
@ -239,7 +274,7 @@ async def _download_and_index(
on_heartbeat=on_heartbeat,
)
return batch_indexed, download_failed + batch_failed
return batch_indexed, len(failed_files) + batch_failed
async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):

View file

@ -6,6 +6,7 @@ Implements 2-phase document status updates for real-time UI feedback:
- Phase 2: Process each document: pending processing ready/failed
"""
import contextlib
import json
import logging
import time
@ -406,10 +407,15 @@ async def index_elasticsearch_documents(
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
with contextlib.suppress(Exception):
await session.rollback()
documents_failed += 1
continue

View file

@ -9,6 +9,7 @@ Implements 2-phase document status updates for real-time UI feedback:
- Phase 2: Process each document: pending processing ready/failed
"""
import contextlib
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
@ -413,10 +414,15 @@ async def index_github_repos(
try:
document.status = DocumentStatus.failed(str(repo_err))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
with contextlib.suppress(Exception):
await session.rollback()
errors.append(f"Failed processing {repo_full_name}: {repo_err}")
documents_failed += 1
continue

View file

@ -28,6 +28,7 @@ from .base import (
check_duplicate_document_by_hash,
get_connector_by_id,
logger,
mark_connector_documents_failed,
parse_date_flexible,
update_connector_last_indexed,
)
@ -448,6 +449,23 @@ async def index_google_calendar_events(
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
# Placeholders for items skipped above (empty/duplicate/unbuildable) would
# otherwise stay stuck in 'pending' and undeletable. Fail them so they're
# recoverable. Leaves already-ready docs untouched.
indexed_ids = {doc.unique_id for doc in connector_docs}
stuck_placeholders = [
(p.unique_id, "Skipped during sync: no indexable content")
for p in placeholders
if p.unique_id and p.unique_id not in indexed_ids
]
if stuck_placeholders:
await mark_connector_documents_failed(
session,
document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR,
search_space_id=search_space_id,
failures=stuck_placeholders,
)
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -35,6 +35,7 @@ from app.connectors.google_drive.file_types import (
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.exceptions import safe_exception_message
from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
@ -45,6 +46,7 @@ from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
get_connector_by_id,
mark_connector_documents_failed,
update_connector_last_indexed,
)
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
@ -367,7 +369,12 @@ async def _should_skip_file(
logger.info(f"Rename-only update: '{old_name}''{file_name}'")
return True, f"File renamed: '{old_name}''{file_name}'"
if not DocumentStatus.is_state(existing.status, DocumentStatus.READY):
state = DocumentStatus.get_state(existing.status)
if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING):
# Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index
# instead of skipping, otherwise it never recovers.
return False, None
if state != DocumentStatus.READY:
return True, "skipped (previously failed)"
return True, "unchanged"
@ -458,10 +465,11 @@ async def _download_files_parallel(
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel, returning ConnectorDocuments.
) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]:
"""Download and ETL files in parallel.
Returns (connector_docs, download_failed_count).
Returns (connector_docs, failed_files), where failed_files is a list of
(file_id, reason) so callers can mark those placeholders failed.
"""
results: list[ConnectorDocument] = []
sem = asyncio.Semaphore(max_concurrency)
@ -469,7 +477,8 @@ async def _download_files_parallel(
completed_count = 0
hb_lock = asyncio.Lock()
async def _download_one(file: dict) -> ConnectorDocument | None:
async def _download_one(file: dict) -> ConnectorDocument | str:
# ConnectorDocument on success; failure reason string otherwise.
nonlocal last_heartbeat, completed_count
async with sem:
markdown, drive_metadata, error = await download_and_extract_content(
@ -479,7 +488,7 @@ async def _download_files_parallel(
file_name = file.get("name", "Unknown")
reason = error or "empty content"
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
return None
return f"Download/ETL failed: {reason}"
doc = _build_connector_doc(
file,
markdown,
@ -500,14 +509,28 @@ async def _download_files_parallel(
tasks = [_download_one(f) for f in files]
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
failed = 0
for outcome in outcomes:
if isinstance(outcome, Exception) or outcome is None:
failed += 1
else:
failed_files: list[tuple[str, str]] = []
for file, outcome in zip(files, outcomes, strict=False):
if isinstance(outcome, ConnectorDocument):
results.append(outcome)
continue
file_id = file.get("id")
if isinstance(outcome, Exception):
reason = f"Download/ETL error: {safe_exception_message(outcome)}"
logger.warning(
"Download/ETL exception for %s: %s",
file.get("name", "Unknown"),
outcome,
exc_info=outcome,
)
elif isinstance(outcome, str):
reason = outcome
else:
reason = "Download or extraction failed"
if file_id:
failed_files.append((file_id, reason))
return results, failed
return results, failed_files
async def _process_single_file(
@ -542,7 +565,16 @@ async def _process_single_file(
drive_client, file, vision_llm=vision_llm
)
if error or not markdown:
logger.warning(f"ETL failed for {file_name}: {error}")
reason = error or "empty content"
logger.warning(f"ETL failed for {file_name}: {reason}")
file_id = file.get("id")
if file_id:
await mark_connector_documents_failed(
session,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
search_space_id=search_space_id,
failures=[(file_id, f"Download/ETL failed: {reason}")],
)
return 0, 1, 0
doc = _build_connector_doc(
@ -630,7 +662,7 @@ async def _download_and_index(
Returns (batch_indexed, total_failed).
"""
connector_docs, download_failed = await _download_files_parallel(
connector_docs, failed_files = await _download_files_parallel(
drive_client,
files,
connector_id=connector_id,
@ -640,6 +672,16 @@ async def _download_and_index(
vision_llm=vision_llm,
)
# Fail the placeholders for files whose download/ETL failed, so they don't
# stay stuck in 'pending'.
if failed_files:
await mark_connector_documents_failed(
session,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
search_space_id=search_space_id,
failures=failed_files,
)
batch_indexed = 0
batch_failed = 0
if connector_docs:
@ -650,7 +692,7 @@ async def _download_and_index(
on_heartbeat=on_heartbeat,
)
return batch_indexed, download_failed + batch_failed
return batch_indexed, len(failed_files) + batch_failed
async def _index_selected_files(

View file

@ -29,6 +29,7 @@ from .base import (
check_duplicate_document_by_hash,
get_connector_by_id,
logger,
mark_connector_documents_failed,
update_connector_last_indexed,
)
@ -479,6 +480,23 @@ async def index_google_gmail_messages(
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
# Placeholders for items skipped above (empty/duplicate/unbuildable) would
# otherwise stay stuck in 'pending' and undeletable. Fail them so they're
# recoverable. Leaves already-ready docs untouched.
indexed_ids = {doc.unique_id for doc in connector_docs}
stuck_placeholders = [
(p.unique_id, "Skipped during sync: no indexable content")
for p in placeholders
if p.unique_id and p.unique_id not in indexed_ids
]
if stuck_placeholders:
await mark_connector_documents_failed(
session,
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
search_space_id=search_space_id,
failures=stuck_placeholders,
)
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -25,6 +25,7 @@ from .base import (
check_duplicate_document_by_hash,
get_connector_by_id,
logger,
mark_connector_documents_failed,
update_connector_last_indexed,
)
@ -303,6 +304,23 @@ async def index_linear_issues(
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
# Placeholders for items skipped above (empty/duplicate/unbuildable) would
# otherwise stay stuck in 'pending' and undeletable. Fail them so they're
# recoverable. Leaves already-ready docs untouched.
indexed_ids = {doc.unique_id for doc in connector_docs}
stuck_placeholders = [
(p.unique_id, "Skipped during sync: no indexable content")
for p in placeholders
if p.unique_id and p.unique_id not in indexed_ids
]
if stuck_placeholders:
await mark_connector_documents_failed(
session,
document_type=DocumentType.LINEAR_CONNECTOR,
search_space_id=search_space_id,
failures=stuck_placeholders,
)
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -7,6 +7,7 @@ Implements 2-phase document status updates for real-time UI feedback:
"""
import asyncio
import contextlib
import time
from collections.abc import Awaitable, Callable
from datetime import datetime, timedelta
@ -485,10 +486,15 @@ async def index_luma_events(
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
with contextlib.suppress(Exception):
await session.rollback()
skipped_events.append(
f"{item.get('event_name', 'Unknown')} (processing error)"
)

View file

@ -27,6 +27,7 @@ from .base import (
check_duplicate_document_by_hash,
get_connector_by_id,
logger,
mark_connector_documents_failed,
update_connector_last_indexed,
)
@ -343,6 +344,23 @@ async def index_notion_pages(
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,
)
# Placeholders for items skipped above (empty/duplicate/unbuildable) would
# otherwise stay stuck in 'pending' and undeletable. Fail them so they're
# recoverable. Leaves already-ready docs untouched.
indexed_ids = {doc.unique_id for doc in connector_docs}
stuck_placeholders = [
(p.unique_id, "Skipped during sync: no indexable content")
for p in placeholders
if p.unique_id and p.unique_id not in indexed_ids
]
if stuck_placeholders:
await mark_connector_documents_failed(
session,
document_type=DocumentType.NOTION_CONNECTOR,
search_space_id=search_space_id,
failures=stuck_placeholders,
)
# ── Finalize ──────────────────────────────────────────────────
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -26,12 +26,14 @@ from app.connectors.onedrive.file_types import should_skip_file as skip_item
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.exceptions import safe_exception_message
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
get_connector_by_id,
mark_connector_documents_failed,
update_connector_last_indexed,
)
@ -119,7 +121,12 @@ async def _should_skip_file(
logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'")
return True, f"File renamed: '{old_name}' -> '{file_name}'"
if not DocumentStatus.is_state(existing.status, DocumentStatus.READY):
state = DocumentStatus.get_state(existing.status)
if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING):
# Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index
# instead of skipping, otherwise it never recovers.
return False, None
if state != DocumentStatus.READY:
return True, "skipped (previously failed)"
return True, "unchanged"
@ -165,15 +172,20 @@ async def _download_files_parallel(
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel. Returns (docs, failed_count)."""
) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]:
"""Download and ETL files in parallel.
Returns (docs, failed_files), where failed_files is a list of
(file_id, reason) so callers can mark those placeholders failed.
"""
results: list[ConnectorDocument] = []
sem = asyncio.Semaphore(max_concurrency)
last_heartbeat = time.time()
completed_count = 0
hb_lock = asyncio.Lock()
async def _download_one(file: dict) -> ConnectorDocument | None:
async def _download_one(file: dict) -> ConnectorDocument | str:
# ConnectorDocument on success; failure reason string otherwise.
nonlocal last_heartbeat, completed_count
async with sem:
markdown, od_metadata, error = await download_and_extract_content(
@ -183,7 +195,7 @@ async def _download_files_parallel(
file_name = file.get("name", "Unknown")
reason = error or "empty content"
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
return None
return f"Download/ETL failed: {reason}"
doc = _build_connector_doc(
file,
markdown,
@ -204,14 +216,28 @@ async def _download_files_parallel(
tasks = [_download_one(f) for f in files]
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
failed = 0
for outcome in outcomes:
if isinstance(outcome, Exception) or outcome is None:
failed += 1
else:
failed_files: list[tuple[str, str]] = []
for file, outcome in zip(files, outcomes, strict=False):
if isinstance(outcome, ConnectorDocument):
results.append(outcome)
continue
file_id = file.get("id")
if isinstance(outcome, Exception):
reason = f"Download/ETL error: {safe_exception_message(outcome)}"
logger.warning(
"Download/ETL exception for %s: %s",
file.get("name", "Unknown"),
outcome,
exc_info=outcome,
)
elif isinstance(outcome, str):
reason = outcome
else:
reason = "Download or extraction failed"
if file_id:
failed_files.append((file_id, reason))
return results, failed
return results, failed_files
async def _download_and_index(
@ -226,7 +252,7 @@ async def _download_and_index(
vision_llm=None,
) -> tuple[int, int]:
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
connector_docs, download_failed = await _download_files_parallel(
connector_docs, failed_files = await _download_files_parallel(
onedrive_client,
files,
connector_id=connector_id,
@ -236,6 +262,15 @@ async def _download_and_index(
vision_llm=vision_llm,
)
# Fail rows for files whose download/ETL failed, so they don't stay stuck.
if failed_files:
await mark_connector_documents_failed(
session,
document_type=DocumentType.ONEDRIVE_FILE,
search_space_id=search_space_id,
failures=failed_files,
)
batch_indexed = 0
batch_failed = 0
if connector_docs:
@ -246,7 +281,7 @@ async def _download_and_index(
on_heartbeat=on_heartbeat,
)
return batch_indexed, download_failed + batch_failed
return batch_indexed, len(failed_files) + batch_failed
async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):

View file

@ -9,6 +9,7 @@ Uses 2-phase document status updates for real-time UI feedback:
- Phase 2: Process each document: pending processing ready/failed
"""
import contextlib
import time
from collections.abc import Awaitable, Callable
from datetime import datetime
@ -586,10 +587,15 @@ async def index_slack_messages(
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
)
with contextlib.suppress(Exception):
await session.rollback()
documents_failed += 1
continue

View file

@ -10,6 +10,7 @@ Uses 2-phase document status updates for real-time UI feedback:
"""
import asyncio
import contextlib
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
@ -630,11 +631,16 @@ async def index_teams_messages(
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
# Commit now so the failed status survives a later rollback or
# crash; otherwise the doc stays stuck in pending/processing.
await session.commit()
except Exception as status_error:
logger.error(
"Failed to update document status to failed: %s",
str(status_error),
)
with contextlib.suppress(Exception):
await session.rollback()
documents_failed += 1
continue

View file

@ -177,3 +177,75 @@ async def test_should_skip_file_skips_failed_document(
assert should_skip, "FAILED documents must be skipped during automatic sync"
assert "failed" in msg.lower()
@pytest.mark.parametrize("stuck_state", ["pending", "processing"])
async def test_should_skip_file_retries_stuck_document(
db_session,
db_search_space,
db_user,
stuck_state,
):
"""A doc stuck in pending/processing (worker died mid-index) must re-index, not skip."""
import importlib
import sys
import types
pkg = "app.tasks.connector_indexers"
stub = pkg not in sys.modules
if stub:
mod = types.ModuleType(pkg)
mod.__path__ = ["app/tasks/connector_indexers"]
mod.__package__ = pkg
sys.modules[pkg] = mod
try:
gdm = importlib.import_module(
"app.tasks.connector_indexers.google_drive_indexer"
)
_should_skip_file = gdm._should_skip_file
finally:
if stub:
sys.modules.pop(pkg, None)
space_id = db_search_space.id
file_id = f"file-{stuck_state}-drive"
md5 = "stuck123checksum"
doc_hash = compute_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE.value, file_id, space_id
)
status = (
DocumentStatus.pending()
if stuck_state == "pending"
else DocumentStatus.processing()
)
stuck_doc = Document(
title="Stuck File.pdf",
document_type=DocumentType.GOOGLE_DRIVE_FILE,
content="Pending...",
content_hash=f"ch-{doc_hash[:12]}",
unique_identifier_hash=doc_hash,
source_markdown="",
search_space_id=space_id,
created_by_id=str(db_user.id),
status=status,
document_metadata={
"google_drive_file_id": file_id,
"google_drive_file_name": "Stuck File.pdf",
"md5_checksum": md5,
},
)
db_session.add(stuck_doc)
await db_session.flush()
incoming_file = {
"id": file_id,
"name": "Stuck File.pdf",
"mimeType": "application/pdf",
"md5Checksum": md5,
}
should_skip, _msg = await _should_skip_file(db_session, incoming_file, space_id)
assert not should_skip, f"{stuck_state} documents must re-index, not be skipped"

View file

@ -0,0 +1,110 @@
"""Integration tests for mark_connector_documents_failed.
Covers the ETL-failure recovery path: a connector placeholder must move out of
``pending``/``processing`` into ``failed`` so it stays deletable, while a
``ready`` document is never clobbered.
"""
import hashlib
import pytest
from sqlalchemy import select
from app.db import Document, DocumentStatus, DocumentType
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.tasks.connector_indexers.base import mark_connector_documents_failed
pytestmark = pytest.mark.integration
async def _make_doc(
db_session,
*,
search_space_id: int,
connector_id: int,
user_id: str,
file_id: str,
status: dict,
) -> Document:
uid_hash = compute_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE.value, file_id, search_space_id
)
doc = Document(
title=f"{file_id}.pdf",
document_type=DocumentType.GOOGLE_DRIVE_FILE,
content="Pending...",
content_hash=hashlib.sha256(f"placeholder:{uid_hash}".encode()).hexdigest(),
unique_identifier_hash=uid_hash,
document_metadata={"google_drive_file_id": file_id},
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
status=status,
)
db_session.add(doc)
await db_session.flush()
return doc
async def test_pending_placeholder_marked_failed(
db_session, db_search_space, db_connector, db_user
):
doc = await _make_doc(
db_session,
search_space_id=db_search_space.id,
connector_id=db_connector.id,
user_id=str(db_user.id),
file_id="file-pending",
status=DocumentStatus.pending(),
)
marked = await mark_connector_documents_failed(
db_session,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
search_space_id=db_search_space.id,
failures=[("file-pending", "Download/ETL failed: boom")],
)
assert marked == 1
await db_session.refresh(doc)
assert DocumentStatus.is_state(doc.status, DocumentStatus.FAILED)
assert doc.status.get("reason") == "Download/ETL failed: boom"
async def test_ready_document_not_clobbered(
db_session, db_search_space, db_connector, db_user
):
doc = await _make_doc(
db_session,
search_space_id=db_search_space.id,
connector_id=db_connector.id,
user_id=str(db_user.id),
file_id="file-ready",
status=DocumentStatus.ready(),
)
marked = await mark_connector_documents_failed(
db_session,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
search_space_id=db_search_space.id,
failures=[("file-ready", "should be ignored")],
)
assert marked == 0
await db_session.refresh(doc)
assert DocumentStatus.is_state(doc.status, DocumentStatus.READY)
async def test_missing_document_is_noop(db_session, db_search_space):
marked = await mark_connector_documents_failed(
db_session,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
search_space_id=db_search_space.id,
failures=[("does-not-exist", "reason")],
)
assert marked == 0
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
assert result.scalars().first() is None

View file

@ -74,7 +74,7 @@ async def test_single_file_returns_one_connector_document(
)
assert len(docs) == 1
assert failed == 0
assert failed == []
assert docs[0].title == "test.txt"
assert docs[0].unique_id == "f1"
assert docs[0].document_type == DocumentType.DROPBOX_FILE
@ -99,7 +99,7 @@ async def test_multiple_files_all_produce_documents(
)
assert len(docs) == 3
assert failed == 0
assert failed == []
assert {d.unique_id for d in docs} == {"f0", "f1", "f2"}
@ -126,7 +126,7 @@ async def test_one_download_exception_does_not_block_others(
)
assert len(docs) == 2
assert failed == 1
assert len(failed) == 1
assert {d.unique_id for d in docs} == {"f0", "f2"}
@ -152,7 +152,7 @@ async def test_etl_error_counts_as_download_failure(
)
assert len(docs) == 1
assert failed == 1
assert len(failed) == 1
# Slice 5: Semaphore bound
@ -191,7 +191,7 @@ async def test_concurrency_bounded_by_semaphore(
)
assert len(docs) == 6
assert failed == 0
assert failed == []
assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2"
@ -230,7 +230,7 @@ async def test_heartbeat_fires_during_parallel_downloads(
)
assert len(docs) == 3
assert failed == 0
assert failed == []
assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once"

View file

@ -69,7 +69,7 @@ async def test_single_file_returns_one_connector_document(
)
assert len(docs) == 1
assert failed == 0
assert failed == []
assert docs[0].title == "test.txt"
assert docs[0].unique_id == "f1"
@ -93,7 +93,7 @@ async def test_multiple_files_all_produce_documents(
)
assert len(docs) == 3
assert failed == 0
assert failed == []
assert {d.unique_id for d in docs} == {"f0", "f1", "f2"}
@ -120,7 +120,7 @@ async def test_one_download_exception_does_not_block_others(
)
assert len(docs) == 2
assert failed == 1
assert len(failed) == 1
assert {d.unique_id for d in docs} == {"f0", "f2"}
@ -146,7 +146,7 @@ async def test_etl_error_counts_as_download_failure(
)
assert len(docs) == 1
assert failed == 1
assert len(failed) == 1
async def test_concurrency_bounded_by_semaphore(
@ -186,7 +186,7 @@ async def test_concurrency_bounded_by_semaphore(
)
assert len(docs) == 6
assert failed == 0
assert failed == []
assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2"
@ -225,7 +225,7 @@ async def test_heartbeat_fires_during_parallel_downloads(
)
assert len(docs) == 3
assert failed == 0
assert failed == []
assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once"
@ -281,7 +281,7 @@ def full_scan_mocks(mock_drive_client, monkeypatch):
monkeypatch.setattr(_mod, "_should_skip_file", _fake_skip)
download_mock = AsyncMock(return_value=([], 0))
download_mock = AsyncMock(return_value=([], []))
monkeypatch.setattr(_mod, "_download_files_parallel", download_mock)
batch_mock = AsyncMock(return_value=([], 0, 0))
@ -350,7 +350,7 @@ async def test_full_scan_three_phase_counts(full_scan_mocks, monkeypatch):
)
mock_docs = [MagicMock(), MagicMock()]
full_scan_mocks["download_mock"].return_value = (mock_docs, 0)
full_scan_mocks["download_mock"].return_value = (mock_docs, [])
full_scan_mocks["batch_mock"].return_value = ([], 2, 0)
indexed, skipped, _unsupported = await _run_full_scan(full_scan_mocks)
@ -376,7 +376,7 @@ async def test_full_scan_respects_max_files(full_scan_mocks, monkeypatch):
AsyncMock(return_value=(page_files, None, None)),
)
full_scan_mocks["download_mock"].return_value = ([], 0)
full_scan_mocks["download_mock"].return_value = ([], [])
full_scan_mocks["batch_mock"].return_value = ([], 0, 0)
await _run_full_scan(full_scan_mocks, max_files=3)
@ -400,7 +400,7 @@ async def test_full_scan_uses_max_concurrency_3_for_indexing(
)
mock_docs = [MagicMock()]
full_scan_mocks["download_mock"].return_value = (mock_docs, 0)
full_scan_mocks["download_mock"].return_value = (mock_docs, [])
full_scan_mocks["batch_mock"].return_value = ([], 1, 0)
await _run_full_scan(full_scan_mocks)
@ -462,7 +462,7 @@ async def test_delta_sync_removals_serial_rest_parallel(monkeypatch):
)
mock_docs = [MagicMock(), MagicMock()]
download_mock = AsyncMock(return_value=(mock_docs, 0))
download_mock = AsyncMock(return_value=(mock_docs, []))
monkeypatch.setattr(_mod, "_download_files_parallel", download_mock)
batch_mock = AsyncMock(return_value=([], 2, 0))

View file

@ -68,7 +68,7 @@ async def test_single_file_returns_one_connector_document(
)
assert len(docs) == 1
assert failed == 0
assert failed == []
assert docs[0].title == "test.txt"
assert docs[0].unique_id == "f1"
assert docs[0].document_type == DocumentType.ONEDRIVE_FILE
@ -93,7 +93,7 @@ async def test_multiple_files_all_produce_documents(
)
assert len(docs) == 3
assert failed == 0
assert failed == []
assert {d.unique_id for d in docs} == {"f0", "f1", "f2"}
@ -120,7 +120,7 @@ async def test_one_download_exception_does_not_block_others(
)
assert len(docs) == 2
assert failed == 1
assert len(failed) == 1
assert {d.unique_id for d in docs} == {"f0", "f2"}
@ -146,7 +146,7 @@ async def test_etl_error_counts_as_download_failure(
)
assert len(docs) == 1
assert failed == 1
assert len(failed) == 1
# Slice 5: Semaphore bound
@ -185,7 +185,7 @@ async def test_concurrency_bounded_by_semaphore(
)
assert len(docs) == 6
assert failed == 0
assert failed == []
assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2"
@ -224,5 +224,5 @@ async def test_heartbeat_fires_during_parallel_downloads(
)
assert len(docs) == 3
assert failed == 0
assert failed == []
assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once"

View file

@ -325,7 +325,7 @@ def gdrive_full_scan_mocks(monkeypatch):
_mod, "_should_skip_file", AsyncMock(return_value=(False, None))
)
download_mock = AsyncMock(return_value=([], 0))
download_mock = AsyncMock(return_value=([], []))
monkeypatch.setattr(_mod, "_download_files_parallel", download_mock)
batch_mock = AsyncMock(return_value=([], 0, 0))
@ -377,7 +377,7 @@ async def test_gdrive_full_scan_skips_over_quota(gdrive_full_scan_mocks, monkeyp
"get_files_in_folder",
AsyncMock(return_value=(page_files, None, None)),
)
m["download_mock"].return_value = ([], 0)
m["download_mock"].return_value = ([], [])
m["batch_mock"].return_value = ([], 2, 0)
_indexed, skipped, _unsup = await _run_gdrive_full_scan(m)
@ -403,7 +403,7 @@ async def test_gdrive_full_scan_deducts_after_indexing(
AsyncMock(return_value=(page_files, None, None)),
)
mock_docs = [MagicMock() for _ in range(3)]
m["download_mock"].return_value = (mock_docs, 0)
m["download_mock"].return_value = (mock_docs, [])
m["batch_mock"].return_value = ([], 3, 0)
await _run_gdrive_full_scan(m)
@ -438,7 +438,7 @@ async def test_gdrive_delta_sync_skips_over_quota(monkeypatch):
_mod, "_should_skip_file", AsyncMock(return_value=(False, None))
)
download_mock = AsyncMock(return_value=([], 0))
download_mock = AsyncMock(return_value=([], []))
monkeypatch.setattr(_mod, "_download_files_parallel", download_mock)
batch_mock = AsyncMock(return_value=([], 2, 0))