mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-31 19:45:15 +02:00
feat: add performance logging middleware and enhance performance tracking across services
- Introduced RequestPerfMiddleware to log request performance metrics, including slow request thresholds. - Updated various services and retrievers to utilize the new performance logging utility for better tracking of execution times. - Enhanced existing methods with detailed performance logs for operations such as embedding, searching, and indexing. - Removed deprecated logging setup in stream_new_chat and replaced it with the new performance logger.
This commit is contained in:
parent
68bb196d45
commit
664c43ca13
11 changed files with 430 additions and 36 deletions
|
|
@ -28,8 +28,9 @@ from app.agents.new_chat.system_prompt import (
|
|||
from app.agents.new_chat.tools.registry import build_tools_async
|
||||
from app.db import ChatVisibility
|
||||
from app.services.connector_service import ConnectorService
|
||||
from app.utils.perf import get_perf_logger
|
||||
|
||||
_perf_log = logging.getLogger("surfsense.perf")
|
||||
_perf_log = get_perf_logger()
|
||||
|
||||
# =============================================================================
|
||||
# Connector Type Mapping
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ This module provides:
|
|||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
|
|
@ -19,6 +20,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||
|
||||
from app.db import async_session_maker
|
||||
from app.services.connector_service import ConnectorService
|
||||
from app.utils.perf import get_perf_logger
|
||||
|
||||
# =============================================================================
|
||||
# Connector Constants and Normalization
|
||||
|
|
@ -412,6 +414,9 @@ async def search_knowledge_base_async(
|
|||
Returns:
|
||||
Formatted string with search results
|
||||
"""
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
all_documents: list[dict[str, Any]] = []
|
||||
|
||||
# Resolve date range (default last 2 years)
|
||||
|
|
@ -423,6 +428,10 @@ async def search_knowledge_base_async(
|
|||
)
|
||||
|
||||
connectors = _normalize_connectors(connectors_to_search, available_connectors)
|
||||
perf.info(
|
||||
"[kb_search] searching %d connectors: %s (space=%d, top_k=%d)",
|
||||
len(connectors), connectors[:5], search_space_id, top_k,
|
||||
)
|
||||
|
||||
connector_specs: dict[str, tuple[str, bool, bool, dict[str, Any]]] = {
|
||||
"YOUTUBE_VIDEO": ("search_youtube", True, True, {}),
|
||||
|
|
@ -492,20 +501,32 @@ async def search_knowledge_base_async(
|
|||
try:
|
||||
# Use isolated session per connector. Shared AsyncSession cannot safely
|
||||
# run concurrent DB operations.
|
||||
t_conn = time.perf_counter()
|
||||
async with semaphore, async_session_maker() as isolated_session:
|
||||
isolated_connector_service = ConnectorService(
|
||||
isolated_session, search_space_id
|
||||
)
|
||||
connector_method = getattr(isolated_connector_service, method_name)
|
||||
_, chunks = await connector_method(**kwargs)
|
||||
perf.info(
|
||||
"[kb_search] connector=%s results=%d in %.3fs",
|
||||
connector, len(chunks), time.perf_counter() - t_conn,
|
||||
)
|
||||
return chunks
|
||||
except Exception as e:
|
||||
print(f"Error searching connector {connector}: {e}")
|
||||
perf.warning(
|
||||
"[kb_search] connector=%s FAILED in %.3fs: %s",
|
||||
connector, time.perf_counter() - t_conn, e,
|
||||
)
|
||||
return []
|
||||
|
||||
t_gather = time.perf_counter()
|
||||
connector_results = await asyncio.gather(
|
||||
*[_search_one_connector(connector) for connector in connectors]
|
||||
)
|
||||
perf.info(
|
||||
"[kb_search] all connectors gathered in %.3fs", time.perf_counter() - t_gather,
|
||||
)
|
||||
for chunks in connector_results:
|
||||
all_documents.extend(chunks)
|
||||
|
||||
|
|
@ -552,7 +573,12 @@ async def search_knowledge_base_async(
|
|||
deduplicated.append(doc)
|
||||
|
||||
output_budget = _compute_tool_output_budget(max_input_tokens)
|
||||
return format_documents_for_context(deduplicated, max_chars=output_budget)
|
||||
result = format_documents_for_context(deduplicated, max_chars=output_budget)
|
||||
perf.info(
|
||||
"[kb_search] TOTAL in %.3fs total_docs=%d deduped=%d output_chars=%d space=%d",
|
||||
time.perf_counter() - t0, len(all_documents), len(deduplicated), len(result), search_space_id,
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def _build_connector_docstring(available_connectors: list[str] | None) -> str:
|
||||
|
|
|
|||
|
|
@ -15,6 +15,9 @@ from slowapi.errors import RateLimitExceeded
|
|||
from slowapi.middleware import SlowAPIMiddleware
|
||||
from slowapi.util import get_remote_address
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
|
||||
from starlette.requests import Request as StarletteRequest
|
||||
from starlette.responses import Response as StarletteResponse
|
||||
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
|
||||
|
||||
from app.agents.new_chat.checkpointer import (
|
||||
|
|
@ -28,6 +31,7 @@ from app.routes.auth_routes import router as auth_router
|
|||
from app.schemas import UserCreate, UserRead, UserUpdate
|
||||
from app.tasks.surfsense_docs_indexer import seed_surfsense_docs
|
||||
from app.users import SECRET, auth_backend, current_active_user, fastapi_users
|
||||
from app.utils.perf import get_perf_logger, log_system_snapshot
|
||||
|
||||
rate_limit_logger = logging.getLogger("surfsense.rate_limit")
|
||||
|
||||
|
|
@ -244,6 +248,63 @@ app = FastAPI(lifespan=lifespan)
|
|||
app.state.limiter = limiter
|
||||
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Request-level performance middleware
|
||||
# ---------------------------------------------------------------------------
|
||||
# Logs wall-clock time, method, path, and status for every request so we can
|
||||
# spot slow endpoints in production logs.
|
||||
|
||||
_PERF_SLOW_REQUEST_THRESHOLD = float(
|
||||
__import__("os").environ.get("PERF_SLOW_REQUEST_MS", "2000")
|
||||
)
|
||||
|
||||
|
||||
class RequestPerfMiddleware(BaseHTTPMiddleware):
|
||||
"""Middleware that logs per-request wall-clock time.
|
||||
|
||||
- ALL requests are logged at DEBUG level.
|
||||
- Requests exceeding PERF_SLOW_REQUEST_MS (default 2000ms) are logged at
|
||||
WARNING level with a system snapshot so we can correlate slow responses
|
||||
with CPU/memory usage at that moment.
|
||||
"""
|
||||
|
||||
async def dispatch(
|
||||
self, request: StarletteRequest, call_next: RequestResponseEndpoint
|
||||
) -> StarletteResponse:
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
response = await call_next(request)
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
|
||||
path = request.url.path
|
||||
method = request.method
|
||||
status = response.status_code
|
||||
|
||||
perf.debug(
|
||||
"[request] %s %s -> %d in %.1fms",
|
||||
method,
|
||||
path,
|
||||
status,
|
||||
elapsed_ms,
|
||||
)
|
||||
|
||||
if elapsed_ms > _PERF_SLOW_REQUEST_THRESHOLD:
|
||||
perf.warning(
|
||||
"[SLOW_REQUEST] %s %s -> %d in %.1fms (threshold=%.0fms)",
|
||||
method,
|
||||
path,
|
||||
status,
|
||||
elapsed_ms,
|
||||
_PERF_SLOW_REQUEST_THRESHOLD,
|
||||
)
|
||||
log_system_snapshot("slow_request")
|
||||
|
||||
return response
|
||||
|
||||
|
||||
app.add_middleware(RequestPerfMiddleware)
|
||||
|
||||
# Add SlowAPI middleware for automatic rate limiting
|
||||
# Uses Starlette BaseHTTPMiddleware (not the raw ASGI variant) to avoid
|
||||
# corrupting StreamingResponse — SlowAPIASGIMiddleware re-sends
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import contextlib
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from sqlalchemy import delete, select
|
||||
|
|
@ -44,6 +45,7 @@ from app.indexing_pipeline.pipeline_logger import (
|
|||
log_retryable_llm_error,
|
||||
log_unexpected_error,
|
||||
)
|
||||
from app.utils.perf import get_perf_logger
|
||||
|
||||
|
||||
class IndexingPipelineService:
|
||||
|
|
@ -58,6 +60,9 @@ class IndexingPipelineService:
|
|||
"""
|
||||
Persist new documents and detect changes, returning only those that need indexing.
|
||||
"""
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
documents = []
|
||||
seen_hashes: set[str] = set()
|
||||
batch_ctx = PipelineLogContext(
|
||||
|
|
@ -140,11 +145,12 @@ class IndexingPipelineService:
|
|||
|
||||
try:
|
||||
await self.session.commit()
|
||||
perf.info(
|
||||
"[indexing] prepare_for_indexing in %.3fs input=%d output=%d",
|
||||
time.perf_counter() - t0, len(connector_docs), len(documents),
|
||||
)
|
||||
return documents
|
||||
except IntegrityError:
|
||||
# A concurrent worker committed a document with the same content_hash
|
||||
# or unique_identifier_hash between our check and our INSERT.
|
||||
# The document already exists — roll back and let the next sync run handle it.
|
||||
log_race_condition(batch_ctx)
|
||||
await self.session.rollback()
|
||||
return []
|
||||
|
|
@ -165,26 +171,39 @@ class IndexingPipelineService:
|
|||
unique_id=connector_doc.unique_id,
|
||||
doc_id=document.id,
|
||||
)
|
||||
perf = get_perf_logger()
|
||||
t_index = time.perf_counter()
|
||||
try:
|
||||
log_index_started(ctx)
|
||||
document.status = DocumentStatus.processing()
|
||||
await self.session.commit()
|
||||
|
||||
t_step = time.perf_counter()
|
||||
if connector_doc.should_summarize and llm is not None:
|
||||
content = await summarize_document(
|
||||
connector_doc.source_markdown, llm, connector_doc.metadata
|
||||
)
|
||||
perf.info(
|
||||
"[indexing] summarize_document doc=%d in %.3fs",
|
||||
document.id, time.perf_counter() - t_step,
|
||||
)
|
||||
elif connector_doc.should_summarize and connector_doc.fallback_summary:
|
||||
content = connector_doc.fallback_summary
|
||||
else:
|
||||
content = connector_doc.source_markdown
|
||||
|
||||
t_step = time.perf_counter()
|
||||
embedding = embed_text(content)
|
||||
perf.debug(
|
||||
"[indexing] embed_text (summary) doc=%d in %.3fs",
|
||||
document.id, time.perf_counter() - t_step,
|
||||
)
|
||||
|
||||
await self.session.execute(
|
||||
delete(Chunk).where(Chunk.document_id == document.id)
|
||||
)
|
||||
|
||||
t_step = time.perf_counter()
|
||||
chunks = [
|
||||
Chunk(content=text, embedding=embed_text(text))
|
||||
for text in chunk_text(
|
||||
|
|
@ -192,6 +211,10 @@ class IndexingPipelineService:
|
|||
use_code_chunker=connector_doc.should_use_code_chunker,
|
||||
)
|
||||
]
|
||||
perf.info(
|
||||
"[indexing] chunk+embed doc=%d chunks=%d in %.3fs",
|
||||
document.id, len(chunks), time.perf_counter() - t_step,
|
||||
)
|
||||
|
||||
document.content = content
|
||||
document.embedding = embedding
|
||||
|
|
@ -199,6 +222,10 @@ class IndexingPipelineService:
|
|||
document.updated_at = datetime.now(UTC)
|
||||
document.status = DocumentStatus.ready()
|
||||
await self.session.commit()
|
||||
perf.info(
|
||||
"[indexing] index TOTAL doc=%d chunks=%d in %.3fs",
|
||||
document.id, len(chunks), time.perf_counter() - t_index,
|
||||
)
|
||||
log_index_success(ctx, chunk_count=len(chunks))
|
||||
|
||||
except RETRYABLE_LLM_ERRORS as e:
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from app.utils.perf import get_perf_logger
|
||||
|
||||
|
||||
class ChucksHybridSearchRetriever:
|
||||
def __init__(self, db_session):
|
||||
|
|
@ -38,9 +41,17 @@ class ChucksHybridSearchRetriever:
|
|||
from app.config import config
|
||||
from app.db import Chunk, Document
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get embedding for the query
|
||||
embedding_model = config.embedding_model_instance
|
||||
t_embed = time.perf_counter()
|
||||
query_embedding = embedding_model.embed(query_text)
|
||||
perf.debug(
|
||||
"[chunk_search] vector_search embedding in %.3fs",
|
||||
time.perf_counter() - t_embed,
|
||||
)
|
||||
|
||||
# Build the query filtered by search space
|
||||
query = (
|
||||
|
|
@ -60,8 +71,13 @@ class ChucksHybridSearchRetriever:
|
|||
query = query.order_by(Chunk.embedding.op("<=>")(query_embedding)).limit(top_k)
|
||||
|
||||
# Execute the query
|
||||
t_db = time.perf_counter()
|
||||
result = await self.db_session.execute(query)
|
||||
chunks = result.scalars().all()
|
||||
perf.info(
|
||||
"[chunk_search] vector_search DB query in %.3fs results=%d (total %.3fs) space=%d",
|
||||
time.perf_counter() - t_db, len(chunks), time.perf_counter() - t0, search_space_id,
|
||||
)
|
||||
|
||||
return chunks
|
||||
|
||||
|
|
@ -91,6 +107,9 @@ class ChucksHybridSearchRetriever:
|
|||
|
||||
from app.db import Chunk, Document
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Create tsvector and tsquery for PostgreSQL full-text search
|
||||
tsvector = func.to_tsvector("english", Chunk.content)
|
||||
tsquery = func.plainto_tsquery("english", query_text)
|
||||
|
|
@ -118,6 +137,10 @@ class ChucksHybridSearchRetriever:
|
|||
# Execute the query
|
||||
result = await self.db_session.execute(query)
|
||||
chunks = result.scalars().all()
|
||||
perf.info(
|
||||
"[chunk_search] full_text_search in %.3fs results=%d space=%d",
|
||||
time.perf_counter() - t0, len(chunks), search_space_id,
|
||||
)
|
||||
|
||||
return chunks
|
||||
|
||||
|
|
@ -157,9 +180,17 @@ class ChucksHybridSearchRetriever:
|
|||
from app.config import config
|
||||
from app.db import Chunk, Document, DocumentType
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get embedding for the query
|
||||
embedding_model = config.embedding_model_instance
|
||||
t_embed = time.perf_counter()
|
||||
query_embedding = embedding_model.embed(query_text)
|
||||
perf.debug(
|
||||
"[chunk_search] hybrid_search embedding in %.3fs",
|
||||
time.perf_counter() - t_embed,
|
||||
)
|
||||
|
||||
# RRF constants
|
||||
k = 60
|
||||
|
|
@ -254,9 +285,14 @@ class ChucksHybridSearchRetriever:
|
|||
.limit(top_k)
|
||||
)
|
||||
|
||||
# Execute the query
|
||||
# Execute the RRF query
|
||||
t_rrf = time.perf_counter()
|
||||
result = await self.db_session.execute(final_query)
|
||||
chunks_with_scores = result.all()
|
||||
perf.info(
|
||||
"[chunk_search] hybrid_search RRF query in %.3fs results=%d space=%d type=%s",
|
||||
time.perf_counter() - t_rrf, len(chunks_with_scores), search_space_id, document_type,
|
||||
)
|
||||
|
||||
# If no results were found, return an empty list
|
||||
if not chunks_with_scores:
|
||||
|
|
@ -354,4 +390,8 @@ class ChucksHybridSearchRetriever:
|
|||
)
|
||||
final_docs.append(entry)
|
||||
|
||||
perf.info(
|
||||
"[chunk_search] hybrid_search TOTAL in %.3fs docs=%d space=%d type=%s",
|
||||
time.perf_counter() - t0, len(final_docs), search_space_id, document_type,
|
||||
)
|
||||
return final_docs
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from app.utils.perf import get_perf_logger
|
||||
|
||||
|
||||
class DocumentHybridSearchRetriever:
|
||||
def __init__(self, db_session):
|
||||
|
|
@ -38,6 +41,9 @@ class DocumentHybridSearchRetriever:
|
|||
from app.config import config
|
||||
from app.db import Document
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get embedding for the query
|
||||
embedding_model = config.embedding_model_instance
|
||||
query_embedding = embedding_model.embed(query_text)
|
||||
|
|
@ -63,6 +69,10 @@ class DocumentHybridSearchRetriever:
|
|||
# Execute the query
|
||||
result = await self.db_session.execute(query)
|
||||
documents = result.scalars().all()
|
||||
perf.info(
|
||||
"[doc_search] vector_search in %.3fs results=%d space=%d",
|
||||
time.perf_counter() - t0, len(documents), search_space_id,
|
||||
)
|
||||
|
||||
return documents
|
||||
|
||||
|
|
@ -92,6 +102,9 @@ class DocumentHybridSearchRetriever:
|
|||
|
||||
from app.db import Document
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Create tsvector and tsquery for PostgreSQL full-text search
|
||||
tsvector = func.to_tsvector("english", Document.content)
|
||||
tsquery = func.plainto_tsquery("english", query_text)
|
||||
|
|
@ -118,6 +131,10 @@ class DocumentHybridSearchRetriever:
|
|||
# Execute the query
|
||||
result = await self.db_session.execute(query)
|
||||
documents = result.scalars().all()
|
||||
perf.info(
|
||||
"[doc_search] full_text_search in %.3fs results=%d space=%d",
|
||||
time.perf_counter() - t0, len(documents), search_space_id,
|
||||
)
|
||||
|
||||
return documents
|
||||
|
||||
|
|
@ -151,6 +168,9 @@ class DocumentHybridSearchRetriever:
|
|||
from app.config import config
|
||||
from app.db import Chunk, Document, DocumentType
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get embedding for the query
|
||||
embedding_model = config.embedding_model_instance
|
||||
query_embedding = embedding_model.embed(query_text)
|
||||
|
|
@ -303,4 +323,8 @@ class DocumentHybridSearchRetriever:
|
|||
)
|
||||
final_docs.append(entry)
|
||||
|
||||
perf.info(
|
||||
"[doc_search] hybrid_search TOTAL in %.3fs docs=%d space=%d type=%s",
|
||||
time.perf_counter() - t0, len(final_docs), search_space_id, document_type,
|
||||
)
|
||||
return final_docs
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import asyncio
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from urllib.parse import urljoin
|
||||
|
|
@ -18,6 +19,7 @@ from app.db import (
|
|||
)
|
||||
from app.retriever.chunks_hybrid_search import ChucksHybridSearchRetriever
|
||||
from app.retriever.documents_hybrid_search import DocumentHybridSearchRetriever
|
||||
from app.utils.perf import get_perf_logger
|
||||
|
||||
|
||||
class ConnectorService:
|
||||
|
|
@ -246,6 +248,9 @@ class ConnectorService:
|
|||
Returns:
|
||||
List of combined and deduplicated document results
|
||||
"""
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# RRF constant
|
||||
k = 60
|
||||
|
||||
|
|
@ -259,6 +264,7 @@ class ConnectorService:
|
|||
# "This session is provisioning a new connection; concurrent operations are not permitted"
|
||||
#
|
||||
# So we run them sequentially.
|
||||
t_chunk = time.perf_counter()
|
||||
chunk_results = await self.chunk_retriever.hybrid_search(
|
||||
query_text=query_text,
|
||||
top_k=retriever_top_k,
|
||||
|
|
@ -267,6 +273,12 @@ class ConnectorService:
|
|||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
)
|
||||
perf.info(
|
||||
"[connector_svc] _combined_rrf chunk_retriever in %.3fs results=%d type=%s",
|
||||
time.perf_counter() - t_chunk, len(chunk_results), document_type,
|
||||
)
|
||||
|
||||
t_doc = time.perf_counter()
|
||||
doc_results = await self.document_retriever.hybrid_search(
|
||||
query_text=query_text,
|
||||
top_k=retriever_top_k,
|
||||
|
|
@ -275,6 +287,10 @@ class ConnectorService:
|
|||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
)
|
||||
perf.info(
|
||||
"[connector_svc] _combined_rrf doc_retriever in %.3fs results=%d type=%s",
|
||||
time.perf_counter() - t_doc, len(doc_results), document_type,
|
||||
)
|
||||
|
||||
# Helper to extract document_id from our doc-grouped result
|
||||
def _doc_id(item: dict[str, Any]) -> int | None:
|
||||
|
|
@ -335,6 +351,10 @@ class ConnectorService:
|
|||
result["chunks"] = doc_data[did]["chunks"]
|
||||
combined_results.append(result)
|
||||
|
||||
perf.info(
|
||||
"[connector_svc] _combined_rrf_search TOTAL in %.3fs results=%d type=%s space=%d",
|
||||
time.perf_counter() - t0, len(combined_results), document_type, search_space_id,
|
||||
)
|
||||
return combined_results
|
||||
|
||||
def _get_doc_url(self, metadata: dict[str, Any]) -> str:
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ synchronous ChatLiteLLM-like interface and async methods.
|
|||
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.callbacks import CallbackManagerForLLMRun
|
||||
|
|
@ -26,6 +27,8 @@ from litellm.exceptions import (
|
|||
ContextWindowExceededError,
|
||||
)
|
||||
|
||||
from app.utils.perf import get_perf_logger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_CONTEXT_OVERFLOW_PATTERNS = re.compile(
|
||||
|
|
@ -410,6 +413,10 @@ class ChatLiteLLMRouter(BaseChatModel):
|
|||
if not self._router:
|
||||
raise ValueError("Router not initialized")
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
msg_count = len(messages)
|
||||
|
||||
# Convert LangChain messages to OpenAI format
|
||||
formatted_messages = self._convert_messages(messages)
|
||||
|
||||
|
|
@ -428,12 +435,28 @@ class ChatLiteLLMRouter(BaseChatModel):
|
|||
**call_kwargs,
|
||||
)
|
||||
except ContextWindowExceededError as e:
|
||||
perf.warning(
|
||||
"[llm_router] _generate CONTEXT_OVERFLOW msgs=%d in %.3fs",
|
||||
msg_count, time.perf_counter() - t0,
|
||||
)
|
||||
raise ContextOverflowError(str(e)) from e
|
||||
except LiteLLMBadRequestError as e:
|
||||
if _is_context_overflow_error(e):
|
||||
perf.warning(
|
||||
"[llm_router] _generate CONTEXT_OVERFLOW msgs=%d in %.3fs",
|
||||
msg_count, time.perf_counter() - t0,
|
||||
)
|
||||
raise ContextOverflowError(str(e)) from e
|
||||
raise
|
||||
|
||||
elapsed = time.perf_counter() - t0
|
||||
perf.info(
|
||||
"[llm_router] _generate completed msgs=%d tools=%d in %.3fs",
|
||||
msg_count,
|
||||
len(self._bound_tools) if self._bound_tools else 0,
|
||||
elapsed,
|
||||
)
|
||||
|
||||
# Convert response to ChatResult with potential tool calls
|
||||
message = self._convert_response_to_message(response.choices[0].message)
|
||||
generation = ChatGeneration(message=message)
|
||||
|
|
@ -453,6 +476,10 @@ class ChatLiteLLMRouter(BaseChatModel):
|
|||
if not self._router:
|
||||
raise ValueError("Router not initialized")
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
msg_count = len(messages)
|
||||
|
||||
# Convert LangChain messages to OpenAI format
|
||||
formatted_messages = self._convert_messages(messages)
|
||||
|
||||
|
|
@ -471,12 +498,28 @@ class ChatLiteLLMRouter(BaseChatModel):
|
|||
**call_kwargs,
|
||||
)
|
||||
except ContextWindowExceededError as e:
|
||||
perf.warning(
|
||||
"[llm_router] _agenerate CONTEXT_OVERFLOW msgs=%d in %.3fs",
|
||||
msg_count, time.perf_counter() - t0,
|
||||
)
|
||||
raise ContextOverflowError(str(e)) from e
|
||||
except LiteLLMBadRequestError as e:
|
||||
if _is_context_overflow_error(e):
|
||||
perf.warning(
|
||||
"[llm_router] _agenerate CONTEXT_OVERFLOW msgs=%d in %.3fs",
|
||||
msg_count, time.perf_counter() - t0,
|
||||
)
|
||||
raise ContextOverflowError(str(e)) from e
|
||||
raise
|
||||
|
||||
elapsed = time.perf_counter() - t0
|
||||
perf.info(
|
||||
"[llm_router] _agenerate completed msgs=%d tools=%d in %.3fs",
|
||||
msg_count,
|
||||
len(self._bound_tools) if self._bound_tools else 0,
|
||||
elapsed,
|
||||
)
|
||||
|
||||
# Convert response to ChatResult with potential tool calls
|
||||
message = self._convert_response_to_message(response.choices[0].message)
|
||||
generation = ChatGeneration(message=message)
|
||||
|
|
@ -541,6 +584,10 @@ class ChatLiteLLMRouter(BaseChatModel):
|
|||
if not self._router:
|
||||
raise ValueError("Router not initialized")
|
||||
|
||||
perf = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
msg_count = len(messages)
|
||||
|
||||
formatted_messages = self._convert_messages(messages)
|
||||
|
||||
# Add tools if bound
|
||||
|
|
@ -559,20 +606,48 @@ class ChatLiteLLMRouter(BaseChatModel):
|
|||
**call_kwargs,
|
||||
)
|
||||
except ContextWindowExceededError as e:
|
||||
perf.warning(
|
||||
"[llm_router] _astream CONTEXT_OVERFLOW msgs=%d in %.3fs",
|
||||
msg_count, time.perf_counter() - t0,
|
||||
)
|
||||
raise ContextOverflowError(str(e)) from e
|
||||
except LiteLLMBadRequestError as e:
|
||||
if _is_context_overflow_error(e):
|
||||
perf.warning(
|
||||
"[llm_router] _astream CONTEXT_OVERFLOW msgs=%d in %.3fs",
|
||||
msg_count, time.perf_counter() - t0,
|
||||
)
|
||||
raise ContextOverflowError(str(e)) from e
|
||||
raise
|
||||
|
||||
# Yield chunks asynchronously
|
||||
t_first_chunk = time.perf_counter()
|
||||
perf.info(
|
||||
"[llm_router] _astream connection established msgs=%d in %.3fs",
|
||||
msg_count, t_first_chunk - t0,
|
||||
)
|
||||
|
||||
chunk_count = 0
|
||||
first_chunk_logged = False
|
||||
async for chunk in response:
|
||||
if hasattr(chunk, "choices") and chunk.choices:
|
||||
delta = chunk.choices[0].delta
|
||||
chunk_msg = self._convert_delta_to_chunk(delta)
|
||||
if chunk_msg:
|
||||
chunk_count += 1
|
||||
if not first_chunk_logged:
|
||||
perf.info(
|
||||
"[llm_router] _astream first chunk in %.3fs (total %.3fs from start)",
|
||||
time.perf_counter() - t_first_chunk,
|
||||
time.perf_counter() - t0,
|
||||
)
|
||||
first_chunk_logged = True
|
||||
yield ChatGenerationChunk(message=chunk_msg)
|
||||
|
||||
perf.info(
|
||||
"[llm_router] _astream completed chunks=%d total=%.3fs",
|
||||
chunk_count, time.perf_counter() - t0,
|
||||
)
|
||||
|
||||
def _convert_messages(self, messages: list[BaseMessage]) -> list[dict]:
|
||||
"""Convert LangChain messages to OpenAI format."""
|
||||
from langchain_core.messages import (
|
||||
|
|
|
|||
|
|
@ -56,14 +56,9 @@ from app.services.chat_session_state_service import (
|
|||
from app.services.connector_service import ConnectorService
|
||||
from app.services.new_streaming_service import VercelStreamingService
|
||||
from app.utils.content_utils import bootstrap_history_from_db
|
||||
from app.utils.perf import get_perf_logger, log_system_snapshot
|
||||
|
||||
_perf_log = logging.getLogger("surfsense.perf")
|
||||
_perf_log.setLevel(logging.DEBUG)
|
||||
if not _perf_log.handlers:
|
||||
_h = logging.StreamHandler()
|
||||
_h.setFormatter(logging.Formatter("%(asctime)s [PERF] %(message)s"))
|
||||
_perf_log.addHandler(_h)
|
||||
_perf_log.propagate = False
|
||||
_perf_log = get_perf_logger()
|
||||
|
||||
_background_tasks: set[asyncio.Task] = set()
|
||||
|
||||
|
|
@ -1050,6 +1045,7 @@ async def stream_new_chat(
|
|||
streaming_service = VercelStreamingService()
|
||||
stream_result = StreamResult()
|
||||
_t_total = time.perf_counter()
|
||||
log_system_snapshot("stream_new_chat_START")
|
||||
|
||||
try:
|
||||
# Mark AI as responding to this user for live collaboration
|
||||
|
|
@ -1382,6 +1378,7 @@ async def stream_new_chat(
|
|||
time.perf_counter() - _t_stream_start,
|
||||
chat_id,
|
||||
)
|
||||
log_system_snapshot("stream_new_chat_END")
|
||||
|
||||
if stream_result.is_interrupted:
|
||||
yield streaming_service.format_finish_step()
|
||||
|
|
|
|||
122
surfsense_backend/app/utils/perf.py
Normal file
122
surfsense_backend/app/utils/perf.py
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
"""
|
||||
Centralized performance monitoring for SurfSense backend.
|
||||
|
||||
Provides:
|
||||
- A shared [PERF] logger used across all modules
|
||||
- perf_timer context manager for timing code blocks
|
||||
- perf_async_timer for async code blocks
|
||||
- system_snapshot() for CPU/memory profiling
|
||||
- RequestPerfMiddleware for per-request timing
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from contextlib import asynccontextmanager, contextmanager
|
||||
from typing import Any
|
||||
|
||||
_perf_log: logging.Logger | None = None
|
||||
|
||||
|
||||
def get_perf_logger() -> logging.Logger:
|
||||
"""Return the singleton [PERF] logger, creating it once on first call."""
|
||||
global _perf_log
|
||||
if _perf_log is None:
|
||||
_perf_log = logging.getLogger("surfsense.perf")
|
||||
_perf_log.setLevel(logging.DEBUG)
|
||||
if not _perf_log.handlers:
|
||||
h = logging.StreamHandler()
|
||||
h.setFormatter(logging.Formatter("%(asctime)s [PERF] %(message)s"))
|
||||
_perf_log.addHandler(h)
|
||||
_perf_log.propagate = False
|
||||
return _perf_log
|
||||
|
||||
|
||||
@contextmanager
|
||||
def perf_timer(label: str, *, extra: dict[str, Any] | None = None):
|
||||
"""Synchronous context manager that logs elapsed wall-clock time.
|
||||
|
||||
Usage:
|
||||
with perf_timer("[my_func] heavy computation"):
|
||||
...
|
||||
"""
|
||||
log = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
yield
|
||||
elapsed = time.perf_counter() - t0
|
||||
suffix = ""
|
||||
if extra:
|
||||
suffix = " " + " ".join(f"{k}={v}" for k, v in extra.items())
|
||||
log.info("%s in %.3fs%s", label, elapsed, suffix)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def perf_async_timer(label: str, *, extra: dict[str, Any] | None = None):
|
||||
"""Async context manager that logs elapsed wall-clock time.
|
||||
|
||||
Usage:
|
||||
async with perf_async_timer("[search] vector search"):
|
||||
...
|
||||
"""
|
||||
log = get_perf_logger()
|
||||
t0 = time.perf_counter()
|
||||
yield
|
||||
elapsed = time.perf_counter() - t0
|
||||
suffix = ""
|
||||
if extra:
|
||||
suffix = " " + " ".join(f"{k}={v}" for k, v in extra.items())
|
||||
log.info("%s in %.3fs%s", label, elapsed, suffix)
|
||||
|
||||
|
||||
def system_snapshot() -> dict[str, Any]:
|
||||
"""Capture a lightweight CPU + memory snapshot of the current process.
|
||||
|
||||
Returns a dict with:
|
||||
- rss_mb: Resident Set Size in MB
|
||||
- cpu_percent: CPU usage % since last call (per-process)
|
||||
- threads: number of active threads
|
||||
- open_fds: number of open file descriptors (Linux only)
|
||||
- asyncio_tasks: number of asyncio tasks currently alive
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
snapshot: dict[str, Any] = {}
|
||||
try:
|
||||
import psutil
|
||||
|
||||
proc = psutil.Process(os.getpid())
|
||||
mem = proc.memory_info()
|
||||
snapshot["rss_mb"] = round(mem.rss / 1024 / 1024, 1)
|
||||
snapshot["cpu_percent"] = proc.cpu_percent(interval=None)
|
||||
snapshot["threads"] = proc.num_threads()
|
||||
try:
|
||||
snapshot["open_fds"] = proc.num_fds()
|
||||
except AttributeError:
|
||||
snapshot["open_fds"] = -1
|
||||
except ImportError:
|
||||
snapshot["rss_mb"] = -1
|
||||
snapshot["cpu_percent"] = -1
|
||||
snapshot["threads"] = -1
|
||||
snapshot["open_fds"] = -1
|
||||
|
||||
try:
|
||||
all_tasks = asyncio.all_tasks()
|
||||
snapshot["asyncio_tasks"] = len(all_tasks)
|
||||
except RuntimeError:
|
||||
snapshot["asyncio_tasks"] = -1
|
||||
|
||||
return snapshot
|
||||
|
||||
|
||||
def log_system_snapshot(label: str = "system_snapshot") -> None:
|
||||
"""Capture and log a system snapshot."""
|
||||
snap = system_snapshot()
|
||||
get_perf_logger().info(
|
||||
"[%s] rss=%.1fMB cpu=%.1f%% threads=%d fds=%d asyncio_tasks=%d",
|
||||
label,
|
||||
snap["rss_mb"],
|
||||
snap["cpu_percent"],
|
||||
snap["threads"],
|
||||
snap["open_fds"],
|
||||
snap["asyncio_tasks"],
|
||||
)
|
||||
|
|
@ -2,27 +2,28 @@ import { EnumConnectorName } from "@/contracts/enums/connector";
|
|||
|
||||
// OAuth Connectors (Quick Connect)
|
||||
export const OAUTH_CONNECTORS = [
|
||||
{
|
||||
id: "google-drive-connector",
|
||||
title: "Google Drive",
|
||||
description: "Search your Drive files",
|
||||
connectorType: EnumConnectorName.GOOGLE_DRIVE_CONNECTOR,
|
||||
authEndpoint: "/api/v1/auth/google/drive/connector/add/",
|
||||
},
|
||||
{
|
||||
id: "google-gmail-connector",
|
||||
title: "Gmail",
|
||||
description: "Search through your emails",
|
||||
connectorType: EnumConnectorName.GOOGLE_GMAIL_CONNECTOR,
|
||||
authEndpoint: "/api/v1/auth/google/gmail/connector/add/",
|
||||
},
|
||||
{
|
||||
id: "google-calendar-connector",
|
||||
title: "Google Calendar",
|
||||
description: "Search through your events",
|
||||
connectorType: EnumConnectorName.GOOGLE_CALENDAR_CONNECTOR,
|
||||
authEndpoint: "/api/v1/auth/google/calendar/connector/add/",
|
||||
},
|
||||
// // Uncomment for managed Google Connections
|
||||
// {
|
||||
// id: "google-drive-connector",
|
||||
// title: "Google Drive",
|
||||
// description: "Search your Drive files",
|
||||
// connectorType: EnumConnectorName.GOOGLE_DRIVE_CONNECTOR,
|
||||
// authEndpoint: "/api/v1/auth/google/drive/connector/add/",
|
||||
// },
|
||||
// {
|
||||
// id: "google-gmail-connector",
|
||||
// title: "Gmail",
|
||||
// description: "Search through your emails",
|
||||
// connectorType: EnumConnectorName.GOOGLE_GMAIL_CONNECTOR,
|
||||
// authEndpoint: "/api/v1/auth/google/gmail/connector/add/",
|
||||
// },
|
||||
// {
|
||||
// id: "google-calendar-connector",
|
||||
// title: "Google Calendar",
|
||||
// description: "Search through your events",
|
||||
// connectorType: EnumConnectorName.GOOGLE_CALENDAR_CONNECTOR,
|
||||
// authEndpoint: "/api/v1/auth/google/calendar/connector/add/",
|
||||
// },
|
||||
{
|
||||
id: "airtable-connector",
|
||||
title: "Airtable",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue