feat(observability): add chat subagent and ETL telemetry primitives

This commit is contained in:
Anish Sarkar 2026-05-22 13:47:50 +05:30
parent 21d9b1f218
commit f7f49de109
2 changed files with 334 additions and 0 deletions

View file

@ -195,6 +195,73 @@ def _perf_elapsed():
) )
@lru_cache(maxsize=1)
def _chat_request_duration():
return _get_meter().create_histogram(
"surfsense.chat.request.duration",
unit="ms",
description="Duration of SurfSense streamed chat requests.",
)
@lru_cache(maxsize=1)
def _chat_request_outcome():
return _get_meter().create_counter(
"surfsense.chat.request.outcome",
description="Count of SurfSense chat request outcomes.",
)
@lru_cache(maxsize=1)
def _subagent_invoke_duration():
return _get_meter().create_histogram(
"surfsense.subagent.invoke.duration",
unit="ms",
description="Duration of SurfSense subagent invocations.",
)
@lru_cache(maxsize=1)
def _subagent_invoke_outcome():
return _get_meter().create_counter(
"surfsense.subagent.invoke.outcome",
description="Count of SurfSense subagent invocation outcomes.",
)
@lru_cache(maxsize=1)
def _etl_extract_duration():
return _get_meter().create_histogram(
"surfsense.etl.extract.duration",
unit="s",
description="Duration of SurfSense ETL extraction.",
)
@lru_cache(maxsize=1)
def _etl_extract_outcome():
return _get_meter().create_counter(
"surfsense.etl.extract.outcome",
description="Count of SurfSense ETL extraction outcomes.",
)
@lru_cache(maxsize=1)
def _celery_heartbeat_refreshes():
return _get_meter().create_counter(
"surfsense.celery.heartbeat.refreshes",
description="Count of SurfSense Celery heartbeat refreshes.",
)
@lru_cache(maxsize=1)
def _celery_heartbeat_failures():
return _get_meter().create_counter(
"surfsense.celery.heartbeat.failures",
description="Count of SurfSense Celery heartbeat failures.",
)
def record_model_call_duration( def record_model_call_duration(
duration_ms: float, *, model: str | None, provider: str | None duration_ms: float, *, model: str | None, provider: str | None
) -> None: ) -> None:
@ -312,6 +379,111 @@ def record_perf_elapsed(duration_ms: float, *, label: str) -> None:
_record(_perf_elapsed(), duration_ms, {"label": label}) _record(_perf_elapsed(), duration_ms, {"label": label})
def record_chat_request_duration(
duration_ms: float,
*,
flow: str,
outcome: str,
agent_mode: str | None = None,
) -> None:
_record(
_chat_request_duration(),
duration_ms,
{"chat.flow": flow, "outcome": outcome, "agent.mode": agent_mode},
)
def record_chat_request_outcome(
*,
flow: str,
outcome: str,
agent_mode: str | None = None,
) -> None:
_add(
_chat_request_outcome(),
1,
{"chat.flow": flow, "outcome": outcome, "agent.mode": agent_mode},
)
def record_subagent_invoke_duration(
duration_ms: float,
*,
subagent_type: str,
path: str | None,
outcome: str,
) -> None:
_record(
_subagent_invoke_duration(),
duration_ms,
{
"subagent.type": subagent_type,
"subagent.path": path or "unknown",
"outcome": outcome,
},
)
def record_subagent_invoke_outcome(
*,
subagent_type: str,
path: str | None,
outcome: str,
) -> None:
_add(
_subagent_invoke_outcome(),
1,
{
"subagent.type": subagent_type,
"subagent.path": path or "unknown",
"outcome": outcome,
},
)
def record_etl_extract_duration(
duration_s: float,
*,
etl_service: str | None,
content_type: str | None,
status: str,
) -> None:
_record(
_etl_extract_duration(),
duration_s,
{
"etl.service": etl_service or "unknown",
"content.type": content_type or "unknown",
"status": status,
},
)
def record_etl_extract_outcome(
*,
etl_service: str | None,
content_type: str | None,
status: str,
) -> None:
_add(
_etl_extract_outcome(),
1,
{
"etl.service": etl_service or "unknown",
"content.type": content_type or "unknown",
"status": status,
},
)
def record_celery_heartbeat_refresh(*, heartbeat_type: str) -> None:
_add(_celery_heartbeat_refreshes(), 1, {"heartbeat.type": heartbeat_type})
def record_celery_heartbeat_failure(*, heartbeat_type: str) -> None:
_add(_celery_heartbeat_failures(), 1, {"heartbeat.type": heartbeat_type})
def _runtime_snapshot_value(key: str, transform: Any = None) -> list[Any]: def _runtime_snapshot_value(key: str, transform: Any = None) -> list[Any]:
from opentelemetry.metrics import Observation from opentelemetry.metrics import Observation
@ -398,9 +570,15 @@ def register_runtime_observables() -> None:
__all__ = [ __all__ = [
"record_auth_failure", "record_auth_failure",
"record_celery_heartbeat_failure",
"record_celery_heartbeat_refresh",
"record_chat_request_duration",
"record_chat_request_outcome",
"record_compaction_run", "record_compaction_run",
"record_connector_sync_duration", "record_connector_sync_duration",
"record_connector_sync_outcome", "record_connector_sync_outcome",
"record_etl_extract_duration",
"record_etl_extract_outcome",
"record_indexing_document_duration", "record_indexing_document_duration",
"record_indexing_document_outcome", "record_indexing_document_outcome",
"record_interrupt", "record_interrupt",
@ -410,6 +588,8 @@ __all__ = [
"record_perf_elapsed", "record_perf_elapsed",
"record_permission_ask", "record_permission_ask",
"record_rate_limit_rejection", "record_rate_limit_rejection",
"record_subagent_invoke_duration",
"record_subagent_invoke_outcome",
"record_tool_call_duration", "record_tool_call_duration",
"record_tool_call_error", "record_tool_call_error",
"register_runtime_observables", "register_runtime_observables",

View file

@ -244,6 +244,152 @@ def kb_persist_span(
return span("kb.persist", attributes=attrs) return span("kb.persist", attributes=attrs)
def chat_request_span(
*,
chat_id: int | None = None,
search_space_id: int | None = None,
flow: str | None = None,
request_id: str | None = None,
turn_id: str | None = None,
filesystem_mode: str | None = None,
client_platform: str | None = None,
agent_mode: str | None = None,
extra: dict[str, Any] | None = None,
):
"""Parent span for a single streamed chat or resume turn."""
attrs: dict[str, Any] = {}
if chat_id is not None:
attrs["chat.id"] = int(chat_id)
if search_space_id is not None:
attrs["search_space.id"] = int(search_space_id)
if flow:
attrs["chat.flow"] = flow
if request_id:
attrs["request.id"] = request_id
if turn_id:
attrs["turn.id"] = turn_id
if filesystem_mode:
attrs["filesystem.mode"] = filesystem_mode
if client_platform:
attrs["client.platform"] = client_platform
if agent_mode:
attrs["agent.mode"] = agent_mode
if extra:
attrs.update(extra)
return span("chat.request", attributes=attrs)
def subagent_invoke_span(
*,
subagent_type: str,
path: str | None = None,
extra: dict[str, Any] | None = None,
):
"""Span around invoking a delegated subagent from the main agent."""
attrs: dict[str, Any] = {"subagent.type": subagent_type}
if path:
attrs["subagent.path"] = path
if extra:
attrs.update(extra)
return span("subagent.invoke", attributes=attrs)
def connector_sync_span(
*,
connector_type: str | None,
extra: dict[str, Any] | None = None,
):
"""Business-level span around connector indexing task execution."""
attrs: dict[str, Any] = {"connector.type": connector_type or "unknown"}
if extra:
attrs.update(extra)
return span("connector.sync", attributes=attrs)
def etl_extract_span(
*,
content_type: str | None = None,
file_extension: str | None = None,
processing_mode: str | None = None,
extra: dict[str, Any] | None = None,
):
"""Span around top-level ETL extraction for a file."""
attrs: dict[str, Any] = {}
if content_type:
attrs["content.type"] = content_type
if file_extension:
attrs["file.extension"] = file_extension
if processing_mode:
attrs["processing.mode"] = processing_mode
if extra:
attrs.update(extra)
return span("etl.extract", attributes=attrs)
def etl_parse_span(
*,
etl_service: str | None,
content_type: str | None = None,
file_extension: str | None = None,
processing_mode: str | None = None,
extra: dict[str, Any] | None = None,
):
"""Span around a concrete ETL parser/backend call."""
attrs: dict[str, Any] = {"etl.service": etl_service or "unknown"}
if content_type:
attrs["content.type"] = content_type
if file_extension:
attrs["file.extension"] = file_extension
if processing_mode:
attrs["processing.mode"] = processing_mode
if extra:
attrs.update(extra)
return span("etl.parse", attributes=attrs)
def etl_ocr_span(
*,
etl_service: str | None,
file_extension: str | None = None,
extra: dict[str, Any] | None = None,
):
"""Span around OCR extraction from image content."""
attrs: dict[str, Any] = {"etl.service": etl_service or "unknown"}
if file_extension:
attrs["file.extension"] = file_extension
if extra:
attrs.update(extra)
return span("etl.ocr", attributes=attrs)
def etl_picture_describe_span(
*,
image_count: int | None = None,
extra: dict[str, Any] | None = None,
):
"""Span around describing embedded images in a document."""
attrs: dict[str, Any] = {}
if image_count is not None:
attrs["image.count"] = int(image_count)
if extra:
attrs.update(extra)
return span("etl.picture.describe", attributes=attrs)
def etl_picture_ocr_span(
*,
file_extension: str | None = None,
extra: dict[str, Any] | None = None,
):
"""Span around per-image OCR during picture description."""
attrs: dict[str, Any] = {}
if file_extension:
attrs["file.extension"] = file_extension
if extra:
attrs.update(extra)
return span("etl.picture.ocr", attributes=attrs)
def compaction_span( def compaction_span(
*, *,
reason: str | None = None, reason: str | None = None,
@ -306,7 +452,14 @@ def reload_for_tests() -> bool:
__all__ = [ __all__ = [
"chat_request_span",
"compaction_span", "compaction_span",
"connector_sync_span",
"etl_extract_span",
"etl_ocr_span",
"etl_parse_span",
"etl_picture_describe_span",
"etl_picture_ocr_span",
"interrupt_span", "interrupt_span",
"is_enabled", "is_enabled",
"kb_persist_span", "kb_persist_span",
@ -315,5 +468,6 @@ __all__ = [
"permission_asked_span", "permission_asked_span",
"reload_for_tests", "reload_for_tests",
"span", "span",
"subagent_invoke_span",
"tool_call_span", "tool_call_span",
] ]