mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-27 19:25:15 +02:00
feat(indexing): track indexing and connector outcomes
This commit is contained in:
parent
b9d76f006d
commit
cea5605e32
2 changed files with 54 additions and 1 deletions
|
|
@ -57,6 +57,7 @@ from app.indexing_pipeline.pipeline_logger import (
|
||||||
log_retryable_llm_error,
|
log_retryable_llm_error,
|
||||||
log_unexpected_error,
|
log_unexpected_error,
|
||||||
)
|
)
|
||||||
|
from app.observability import metrics as ot_metrics, otel as ot
|
||||||
from app.utils.perf import get_perf_logger
|
from app.utils.perf import get_perf_logger
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -362,6 +363,16 @@ class IndexingPipelineService:
|
||||||
)
|
)
|
||||||
perf = get_perf_logger()
|
perf = get_perf_logger()
|
||||||
t_index = time.perf_counter()
|
t_index = time.perf_counter()
|
||||||
|
document_type = (
|
||||||
|
document.document_type.value
|
||||||
|
if getattr(document, "document_type", None)
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
persist_span_cm = ot.kb_persist_span(
|
||||||
|
document_type=document_type,
|
||||||
|
)
|
||||||
|
persist_span = persist_span_cm.__enter__()
|
||||||
|
outcome_status = "failed"
|
||||||
try:
|
try:
|
||||||
log_index_started(ctx)
|
log_index_started(ctx)
|
||||||
document.status = DocumentStatus.processing()
|
document.status = DocumentStatus.processing()
|
||||||
|
|
@ -429,11 +440,13 @@ class IndexingPipelineService:
|
||||||
time.perf_counter() - t_index,
|
time.perf_counter() - t_index,
|
||||||
)
|
)
|
||||||
log_index_success(ctx, chunk_count=len(chunks))
|
log_index_success(ctx, chunk_count=len(chunks))
|
||||||
|
outcome_status = "success"
|
||||||
|
|
||||||
await self._enqueue_ai_sort_if_enabled(document)
|
await self._enqueue_ai_sort_if_enabled(document)
|
||||||
|
|
||||||
except RETRYABLE_LLM_ERRORS as e:
|
except RETRYABLE_LLM_ERRORS as e:
|
||||||
log_retryable_llm_error(ctx, e)
|
log_retryable_llm_error(ctx, e)
|
||||||
|
outcome_status = "requeued"
|
||||||
await rollback_and_persist_failure(
|
await rollback_and_persist_failure(
|
||||||
self.session, document, llm_retryable_message(e)
|
self.session, document, llm_retryable_message(e)
|
||||||
)
|
)
|
||||||
|
|
@ -465,6 +478,17 @@ class IndexingPipelineService:
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
await self.session.refresh(document)
|
await self.session.refresh(document)
|
||||||
|
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
persist_span.set_attribute("indexing.status", outcome_status)
|
||||||
|
ot_metrics.record_indexing_document_duration(
|
||||||
|
time.perf_counter() - t_index,
|
||||||
|
document_type=document_type,
|
||||||
|
)
|
||||||
|
ot_metrics.record_indexing_document_outcome(
|
||||||
|
document_type=document_type,
|
||||||
|
status=outcome_status,
|
||||||
|
)
|
||||||
|
persist_span_cm.__exit__(None, None, None)
|
||||||
return document
|
return document
|
||||||
|
|
||||||
async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None:
|
async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None:
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,43 @@
|
||||||
"""Celery tasks for connector indexing."""
|
"""Celery tasks for connector indexing."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
|
||||||
|
from celery import current_task
|
||||||
|
|
||||||
from app.celery_app import celery_app
|
from app.celery_app import celery_app
|
||||||
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
|
from app.observability import metrics as ot_metrics
|
||||||
|
from app.tasks.celery_tasks import (
|
||||||
|
get_celery_session_maker,
|
||||||
|
run_async_celery_task as _run_async_celery_task,
|
||||||
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def run_async_celery_task[T](coro_factory: Callable[[], Awaitable[T]]) -> T:
|
||||||
|
"""Run connector sync work and record aggregate connector metrics."""
|
||||||
|
task_name = getattr(current_task, "name", None) or "unknown"
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
status = "failed"
|
||||||
|
try:
|
||||||
|
result = _run_async_celery_task(coro_factory)
|
||||||
|
status = "success"
|
||||||
|
return result
|
||||||
|
finally:
|
||||||
|
elapsed_s = time.perf_counter() - t0
|
||||||
|
ot_metrics.record_connector_sync_duration(
|
||||||
|
elapsed_s,
|
||||||
|
connector_type=task_name,
|
||||||
|
)
|
||||||
|
ot_metrics.record_connector_sync_outcome(
|
||||||
|
connector_type=task_name,
|
||||||
|
status=status,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _handle_greenlet_error(e: Exception, task_name: str, connector_id: int) -> None:
|
def _handle_greenlet_error(e: Exception, task_name: str, connector_id: int) -> None:
|
||||||
"""
|
"""
|
||||||
Handle greenlet_spawn errors with detailed logging for debugging.
|
Handle greenlet_spawn errors with detailed logging for debugging.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue