diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index 3843b1687..af0d6bdc5 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -28,8 +28,9 @@ from app.agents.new_chat.system_prompt import ( from app.agents.new_chat.tools.registry import build_tools_async from app.db import ChatVisibility from app.services.connector_service import ConnectorService +from app.utils.perf import get_perf_logger -_perf_log = logging.getLogger("surfsense.perf") +_perf_log = get_perf_logger() # ============================================================================= # Connector Type Mapping diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index 6989a1aa2..19f21bbc6 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -10,6 +10,7 @@ This module provides: import asyncio import json +import time from datetime import datetime from typing import Any @@ -19,6 +20,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import async_session_maker from app.services.connector_service import ConnectorService +from app.utils.perf import get_perf_logger # ============================================================================= # Connector Constants and Normalization @@ -412,6 +414,9 @@ async def search_knowledge_base_async( Returns: Formatted string with search results """ + perf = get_perf_logger() + t0 = time.perf_counter() + all_documents: list[dict[str, Any]] = [] # Resolve date range (default last 2 years) @@ -423,6 +428,10 @@ async def search_knowledge_base_async( ) connectors = _normalize_connectors(connectors_to_search, available_connectors) + perf.info( + "[kb_search] searching %d connectors: %s (space=%d, top_k=%d)", + len(connectors), connectors[:5], search_space_id, top_k, + ) connector_specs: dict[str, tuple[str, bool, bool, dict[str, Any]]] = { "YOUTUBE_VIDEO": ("search_youtube", True, True, {}), @@ -492,20 +501,32 @@ async def search_knowledge_base_async( try: # Use isolated session per connector. Shared AsyncSession cannot safely # run concurrent DB operations. + t_conn = time.perf_counter() async with semaphore, async_session_maker() as isolated_session: isolated_connector_service = ConnectorService( isolated_session, search_space_id ) connector_method = getattr(isolated_connector_service, method_name) _, chunks = await connector_method(**kwargs) + perf.info( + "[kb_search] connector=%s results=%d in %.3fs", + connector, len(chunks), time.perf_counter() - t_conn, + ) return chunks except Exception as e: - print(f"Error searching connector {connector}: {e}") + perf.warning( + "[kb_search] connector=%s FAILED in %.3fs: %s", + connector, time.perf_counter() - t_conn, e, + ) return [] + t_gather = time.perf_counter() connector_results = await asyncio.gather( *[_search_one_connector(connector) for connector in connectors] ) + perf.info( + "[kb_search] all connectors gathered in %.3fs", time.perf_counter() - t_gather, + ) for chunks in connector_results: all_documents.extend(chunks) @@ -552,7 +573,12 @@ async def search_knowledge_base_async( deduplicated.append(doc) output_budget = _compute_tool_output_budget(max_input_tokens) - return format_documents_for_context(deduplicated, max_chars=output_budget) + result = format_documents_for_context(deduplicated, max_chars=output_budget) + perf.info( + "[kb_search] TOTAL in %.3fs total_docs=%d deduped=%d output_chars=%d space=%d", + time.perf_counter() - t0, len(all_documents), len(deduplicated), len(result), search_space_id, + ) + return result def _build_connector_docstring(available_connectors: list[str] | None) -> str: diff --git a/surfsense_backend/app/app.py b/surfsense_backend/app/app.py index 0a549abe5..e8843878f 100644 --- a/surfsense_backend/app/app.py +++ b/surfsense_backend/app/app.py @@ -15,6 +15,9 @@ from slowapi.errors import RateLimitExceeded from slowapi.middleware import SlowAPIMiddleware from slowapi.util import get_remote_address from sqlalchemy.ext.asyncio import AsyncSession +from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint +from starlette.requests import Request as StarletteRequest +from starlette.responses import Response as StarletteResponse from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware from app.agents.new_chat.checkpointer import ( @@ -28,6 +31,7 @@ from app.routes.auth_routes import router as auth_router from app.schemas import UserCreate, UserRead, UserUpdate from app.tasks.surfsense_docs_indexer import seed_surfsense_docs from app.users import SECRET, auth_backend, current_active_user, fastapi_users +from app.utils.perf import get_perf_logger, log_system_snapshot rate_limit_logger = logging.getLogger("surfsense.rate_limit") @@ -244,6 +248,63 @@ app = FastAPI(lifespan=lifespan) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) + +# --------------------------------------------------------------------------- +# Request-level performance middleware +# --------------------------------------------------------------------------- +# Logs wall-clock time, method, path, and status for every request so we can +# spot slow endpoints in production logs. + +_PERF_SLOW_REQUEST_THRESHOLD = float( + __import__("os").environ.get("PERF_SLOW_REQUEST_MS", "2000") +) + + +class RequestPerfMiddleware(BaseHTTPMiddleware): + """Middleware that logs per-request wall-clock time. + + - ALL requests are logged at DEBUG level. + - Requests exceeding PERF_SLOW_REQUEST_MS (default 2000ms) are logged at + WARNING level with a system snapshot so we can correlate slow responses + with CPU/memory usage at that moment. + """ + + async def dispatch( + self, request: StarletteRequest, call_next: RequestResponseEndpoint + ) -> StarletteResponse: + perf = get_perf_logger() + t0 = time.perf_counter() + response = await call_next(request) + elapsed_ms = (time.perf_counter() - t0) * 1000 + + path = request.url.path + method = request.method + status = response.status_code + + perf.debug( + "[request] %s %s -> %d in %.1fms", + method, + path, + status, + elapsed_ms, + ) + + if elapsed_ms > _PERF_SLOW_REQUEST_THRESHOLD: + perf.warning( + "[SLOW_REQUEST] %s %s -> %d in %.1fms (threshold=%.0fms)", + method, + path, + status, + elapsed_ms, + _PERF_SLOW_REQUEST_THRESHOLD, + ) + log_system_snapshot("slow_request") + + return response + + +app.add_middleware(RequestPerfMiddleware) + # Add SlowAPI middleware for automatic rate limiting # Uses Starlette BaseHTTPMiddleware (not the raw ASGI variant) to avoid # corrupting StreamingResponse — SlowAPIASGIMiddleware re-sends diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index eea3d6e25..e6d7977cc 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -1,4 +1,5 @@ import contextlib +import time from datetime import UTC, datetime from sqlalchemy import delete, select @@ -44,6 +45,7 @@ from app.indexing_pipeline.pipeline_logger import ( log_retryable_llm_error, log_unexpected_error, ) +from app.utils.perf import get_perf_logger class IndexingPipelineService: @@ -58,6 +60,9 @@ class IndexingPipelineService: """ Persist new documents and detect changes, returning only those that need indexing. """ + perf = get_perf_logger() + t0 = time.perf_counter() + documents = [] seen_hashes: set[str] = set() batch_ctx = PipelineLogContext( @@ -140,11 +145,12 @@ class IndexingPipelineService: try: await self.session.commit() + perf.info( + "[indexing] prepare_for_indexing in %.3fs input=%d output=%d", + time.perf_counter() - t0, len(connector_docs), len(documents), + ) return documents except IntegrityError: - # A concurrent worker committed a document with the same content_hash - # or unique_identifier_hash between our check and our INSERT. - # The document already exists — roll back and let the next sync run handle it. log_race_condition(batch_ctx) await self.session.rollback() return [] @@ -165,26 +171,39 @@ class IndexingPipelineService: unique_id=connector_doc.unique_id, doc_id=document.id, ) + perf = get_perf_logger() + t_index = time.perf_counter() try: log_index_started(ctx) document.status = DocumentStatus.processing() await self.session.commit() + t_step = time.perf_counter() if connector_doc.should_summarize and llm is not None: content = await summarize_document( connector_doc.source_markdown, llm, connector_doc.metadata ) + perf.info( + "[indexing] summarize_document doc=%d in %.3fs", + document.id, time.perf_counter() - t_step, + ) elif connector_doc.should_summarize and connector_doc.fallback_summary: content = connector_doc.fallback_summary else: content = connector_doc.source_markdown + t_step = time.perf_counter() embedding = embed_text(content) + perf.debug( + "[indexing] embed_text (summary) doc=%d in %.3fs", + document.id, time.perf_counter() - t_step, + ) await self.session.execute( delete(Chunk).where(Chunk.document_id == document.id) ) + t_step = time.perf_counter() chunks = [ Chunk(content=text, embedding=embed_text(text)) for text in chunk_text( @@ -192,6 +211,10 @@ class IndexingPipelineService: use_code_chunker=connector_doc.should_use_code_chunker, ) ] + perf.info( + "[indexing] chunk+embed doc=%d chunks=%d in %.3fs", + document.id, len(chunks), time.perf_counter() - t_step, + ) document.content = content document.embedding = embedding @@ -199,6 +222,10 @@ class IndexingPipelineService: document.updated_at = datetime.now(UTC) document.status = DocumentStatus.ready() await self.session.commit() + perf.info( + "[indexing] index TOTAL doc=%d chunks=%d in %.3fs", + document.id, len(chunks), time.perf_counter() - t_index, + ) log_index_success(ctx, chunk_count=len(chunks)) except RETRYABLE_LLM_ERRORS as e: diff --git a/surfsense_backend/app/retriever/chunks_hybrid_search.py b/surfsense_backend/app/retriever/chunks_hybrid_search.py index 9aa301386..ed3f63acc 100644 --- a/surfsense_backend/app/retriever/chunks_hybrid_search.py +++ b/surfsense_backend/app/retriever/chunks_hybrid_search.py @@ -1,5 +1,8 @@ +import time from datetime import datetime +from app.utils.perf import get_perf_logger + class ChucksHybridSearchRetriever: def __init__(self, db_session): @@ -38,9 +41,17 @@ class ChucksHybridSearchRetriever: from app.config import config from app.db import Chunk, Document + perf = get_perf_logger() + t0 = time.perf_counter() + # Get embedding for the query embedding_model = config.embedding_model_instance + t_embed = time.perf_counter() query_embedding = embedding_model.embed(query_text) + perf.debug( + "[chunk_search] vector_search embedding in %.3fs", + time.perf_counter() - t_embed, + ) # Build the query filtered by search space query = ( @@ -60,8 +71,13 @@ class ChucksHybridSearchRetriever: query = query.order_by(Chunk.embedding.op("<=>")(query_embedding)).limit(top_k) # Execute the query + t_db = time.perf_counter() result = await self.db_session.execute(query) chunks = result.scalars().all() + perf.info( + "[chunk_search] vector_search DB query in %.3fs results=%d (total %.3fs) space=%d", + time.perf_counter() - t_db, len(chunks), time.perf_counter() - t0, search_space_id, + ) return chunks @@ -91,6 +107,9 @@ class ChucksHybridSearchRetriever: from app.db import Chunk, Document + perf = get_perf_logger() + t0 = time.perf_counter() + # Create tsvector and tsquery for PostgreSQL full-text search tsvector = func.to_tsvector("english", Chunk.content) tsquery = func.plainto_tsquery("english", query_text) @@ -118,6 +137,10 @@ class ChucksHybridSearchRetriever: # Execute the query result = await self.db_session.execute(query) chunks = result.scalars().all() + perf.info( + "[chunk_search] full_text_search in %.3fs results=%d space=%d", + time.perf_counter() - t0, len(chunks), search_space_id, + ) return chunks @@ -157,9 +180,17 @@ class ChucksHybridSearchRetriever: from app.config import config from app.db import Chunk, Document, DocumentType + perf = get_perf_logger() + t0 = time.perf_counter() + # Get embedding for the query embedding_model = config.embedding_model_instance + t_embed = time.perf_counter() query_embedding = embedding_model.embed(query_text) + perf.debug( + "[chunk_search] hybrid_search embedding in %.3fs", + time.perf_counter() - t_embed, + ) # RRF constants k = 60 @@ -254,9 +285,14 @@ class ChucksHybridSearchRetriever: .limit(top_k) ) - # Execute the query + # Execute the RRF query + t_rrf = time.perf_counter() result = await self.db_session.execute(final_query) chunks_with_scores = result.all() + perf.info( + "[chunk_search] hybrid_search RRF query in %.3fs results=%d space=%d type=%s", + time.perf_counter() - t_rrf, len(chunks_with_scores), search_space_id, document_type, + ) # If no results were found, return an empty list if not chunks_with_scores: @@ -354,4 +390,8 @@ class ChucksHybridSearchRetriever: ) final_docs.append(entry) + perf.info( + "[chunk_search] hybrid_search TOTAL in %.3fs docs=%d space=%d type=%s", + time.perf_counter() - t0, len(final_docs), search_space_id, document_type, + ) return final_docs diff --git a/surfsense_backend/app/retriever/documents_hybrid_search.py b/surfsense_backend/app/retriever/documents_hybrid_search.py index 9ff104ff0..608e1c2e6 100644 --- a/surfsense_backend/app/retriever/documents_hybrid_search.py +++ b/surfsense_backend/app/retriever/documents_hybrid_search.py @@ -1,5 +1,8 @@ +import time from datetime import datetime +from app.utils.perf import get_perf_logger + class DocumentHybridSearchRetriever: def __init__(self, db_session): @@ -38,6 +41,9 @@ class DocumentHybridSearchRetriever: from app.config import config from app.db import Document + perf = get_perf_logger() + t0 = time.perf_counter() + # Get embedding for the query embedding_model = config.embedding_model_instance query_embedding = embedding_model.embed(query_text) @@ -63,6 +69,10 @@ class DocumentHybridSearchRetriever: # Execute the query result = await self.db_session.execute(query) documents = result.scalars().all() + perf.info( + "[doc_search] vector_search in %.3fs results=%d space=%d", + time.perf_counter() - t0, len(documents), search_space_id, + ) return documents @@ -92,6 +102,9 @@ class DocumentHybridSearchRetriever: from app.db import Document + perf = get_perf_logger() + t0 = time.perf_counter() + # Create tsvector and tsquery for PostgreSQL full-text search tsvector = func.to_tsvector("english", Document.content) tsquery = func.plainto_tsquery("english", query_text) @@ -118,6 +131,10 @@ class DocumentHybridSearchRetriever: # Execute the query result = await self.db_session.execute(query) documents = result.scalars().all() + perf.info( + "[doc_search] full_text_search in %.3fs results=%d space=%d", + time.perf_counter() - t0, len(documents), search_space_id, + ) return documents @@ -151,6 +168,9 @@ class DocumentHybridSearchRetriever: from app.config import config from app.db import Chunk, Document, DocumentType + perf = get_perf_logger() + t0 = time.perf_counter() + # Get embedding for the query embedding_model = config.embedding_model_instance query_embedding = embedding_model.embed(query_text) @@ -303,4 +323,8 @@ class DocumentHybridSearchRetriever: ) final_docs.append(entry) + perf.info( + "[doc_search] hybrid_search TOTAL in %.3fs docs=%d space=%d type=%s", + time.perf_counter() - t0, len(final_docs), search_space_id, document_type, + ) return final_docs diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 3bd9a4421..fa91de391 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -1,4 +1,5 @@ import asyncio +import time from datetime import datetime from typing import Any from urllib.parse import urljoin @@ -18,6 +19,7 @@ from app.db import ( ) from app.retriever.chunks_hybrid_search import ChucksHybridSearchRetriever from app.retriever.documents_hybrid_search import DocumentHybridSearchRetriever +from app.utils.perf import get_perf_logger class ConnectorService: @@ -246,6 +248,9 @@ class ConnectorService: Returns: List of combined and deduplicated document results """ + perf = get_perf_logger() + t0 = time.perf_counter() + # RRF constant k = 60 @@ -259,6 +264,7 @@ class ConnectorService: # "This session is provisioning a new connection; concurrent operations are not permitted" # # So we run them sequentially. + t_chunk = time.perf_counter() chunk_results = await self.chunk_retriever.hybrid_search( query_text=query_text, top_k=retriever_top_k, @@ -267,6 +273,12 @@ class ConnectorService: start_date=start_date, end_date=end_date, ) + perf.info( + "[connector_svc] _combined_rrf chunk_retriever in %.3fs results=%d type=%s", + time.perf_counter() - t_chunk, len(chunk_results), document_type, + ) + + t_doc = time.perf_counter() doc_results = await self.document_retriever.hybrid_search( query_text=query_text, top_k=retriever_top_k, @@ -275,6 +287,10 @@ class ConnectorService: start_date=start_date, end_date=end_date, ) + perf.info( + "[connector_svc] _combined_rrf doc_retriever in %.3fs results=%d type=%s", + time.perf_counter() - t_doc, len(doc_results), document_type, + ) # Helper to extract document_id from our doc-grouped result def _doc_id(item: dict[str, Any]) -> int | None: @@ -335,6 +351,10 @@ class ConnectorService: result["chunks"] = doc_data[did]["chunks"] combined_results.append(result) + perf.info( + "[connector_svc] _combined_rrf_search TOTAL in %.3fs results=%d type=%s space=%d", + time.perf_counter() - t0, len(combined_results), document_type, search_space_id, + ) return combined_results def _get_doc_url(self, metadata: dict[str, Any]) -> str: diff --git a/surfsense_backend/app/services/llm_router_service.py b/surfsense_backend/app/services/llm_router_service.py index 2e517f0ba..e9b84c5cd 100644 --- a/surfsense_backend/app/services/llm_router_service.py +++ b/surfsense_backend/app/services/llm_router_service.py @@ -13,6 +13,7 @@ synchronous ChatLiteLLM-like interface and async methods. import logging import re +import time from typing import Any from langchain_core.callbacks import CallbackManagerForLLMRun @@ -26,6 +27,8 @@ from litellm.exceptions import ( ContextWindowExceededError, ) +from app.utils.perf import get_perf_logger + logger = logging.getLogger(__name__) _CONTEXT_OVERFLOW_PATTERNS = re.compile( @@ -410,6 +413,10 @@ class ChatLiteLLMRouter(BaseChatModel): if not self._router: raise ValueError("Router not initialized") + perf = get_perf_logger() + t0 = time.perf_counter() + msg_count = len(messages) + # Convert LangChain messages to OpenAI format formatted_messages = self._convert_messages(messages) @@ -428,12 +435,28 @@ class ChatLiteLLMRouter(BaseChatModel): **call_kwargs, ) except ContextWindowExceededError as e: + perf.warning( + "[llm_router] _generate CONTEXT_OVERFLOW msgs=%d in %.3fs", + msg_count, time.perf_counter() - t0, + ) raise ContextOverflowError(str(e)) from e except LiteLLMBadRequestError as e: if _is_context_overflow_error(e): + perf.warning( + "[llm_router] _generate CONTEXT_OVERFLOW msgs=%d in %.3fs", + msg_count, time.perf_counter() - t0, + ) raise ContextOverflowError(str(e)) from e raise + elapsed = time.perf_counter() - t0 + perf.info( + "[llm_router] _generate completed msgs=%d tools=%d in %.3fs", + msg_count, + len(self._bound_tools) if self._bound_tools else 0, + elapsed, + ) + # Convert response to ChatResult with potential tool calls message = self._convert_response_to_message(response.choices[0].message) generation = ChatGeneration(message=message) @@ -453,6 +476,10 @@ class ChatLiteLLMRouter(BaseChatModel): if not self._router: raise ValueError("Router not initialized") + perf = get_perf_logger() + t0 = time.perf_counter() + msg_count = len(messages) + # Convert LangChain messages to OpenAI format formatted_messages = self._convert_messages(messages) @@ -471,12 +498,28 @@ class ChatLiteLLMRouter(BaseChatModel): **call_kwargs, ) except ContextWindowExceededError as e: + perf.warning( + "[llm_router] _agenerate CONTEXT_OVERFLOW msgs=%d in %.3fs", + msg_count, time.perf_counter() - t0, + ) raise ContextOverflowError(str(e)) from e except LiteLLMBadRequestError as e: if _is_context_overflow_error(e): + perf.warning( + "[llm_router] _agenerate CONTEXT_OVERFLOW msgs=%d in %.3fs", + msg_count, time.perf_counter() - t0, + ) raise ContextOverflowError(str(e)) from e raise + elapsed = time.perf_counter() - t0 + perf.info( + "[llm_router] _agenerate completed msgs=%d tools=%d in %.3fs", + msg_count, + len(self._bound_tools) if self._bound_tools else 0, + elapsed, + ) + # Convert response to ChatResult with potential tool calls message = self._convert_response_to_message(response.choices[0].message) generation = ChatGeneration(message=message) @@ -541,6 +584,10 @@ class ChatLiteLLMRouter(BaseChatModel): if not self._router: raise ValueError("Router not initialized") + perf = get_perf_logger() + t0 = time.perf_counter() + msg_count = len(messages) + formatted_messages = self._convert_messages(messages) # Add tools if bound @@ -559,20 +606,48 @@ class ChatLiteLLMRouter(BaseChatModel): **call_kwargs, ) except ContextWindowExceededError as e: + perf.warning( + "[llm_router] _astream CONTEXT_OVERFLOW msgs=%d in %.3fs", + msg_count, time.perf_counter() - t0, + ) raise ContextOverflowError(str(e)) from e except LiteLLMBadRequestError as e: if _is_context_overflow_error(e): + perf.warning( + "[llm_router] _astream CONTEXT_OVERFLOW msgs=%d in %.3fs", + msg_count, time.perf_counter() - t0, + ) raise ContextOverflowError(str(e)) from e raise - # Yield chunks asynchronously + t_first_chunk = time.perf_counter() + perf.info( + "[llm_router] _astream connection established msgs=%d in %.3fs", + msg_count, t_first_chunk - t0, + ) + + chunk_count = 0 + first_chunk_logged = False async for chunk in response: if hasattr(chunk, "choices") and chunk.choices: delta = chunk.choices[0].delta chunk_msg = self._convert_delta_to_chunk(delta) if chunk_msg: + chunk_count += 1 + if not first_chunk_logged: + perf.info( + "[llm_router] _astream first chunk in %.3fs (total %.3fs from start)", + time.perf_counter() - t_first_chunk, + time.perf_counter() - t0, + ) + first_chunk_logged = True yield ChatGenerationChunk(message=chunk_msg) + perf.info( + "[llm_router] _astream completed chunks=%d total=%.3fs", + chunk_count, time.perf_counter() - t0, + ) + def _convert_messages(self, messages: list[BaseMessage]) -> list[dict]: """Convert LangChain messages to OpenAI format.""" from langchain_core.messages import ( diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index ddadbc48b..9e91a8735 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -56,14 +56,9 @@ from app.services.chat_session_state_service import ( from app.services.connector_service import ConnectorService from app.services.new_streaming_service import VercelStreamingService from app.utils.content_utils import bootstrap_history_from_db +from app.utils.perf import get_perf_logger, log_system_snapshot -_perf_log = logging.getLogger("surfsense.perf") -_perf_log.setLevel(logging.DEBUG) -if not _perf_log.handlers: - _h = logging.StreamHandler() - _h.setFormatter(logging.Formatter("%(asctime)s [PERF] %(message)s")) - _perf_log.addHandler(_h) - _perf_log.propagate = False +_perf_log = get_perf_logger() _background_tasks: set[asyncio.Task] = set() @@ -1050,6 +1045,7 @@ async def stream_new_chat( streaming_service = VercelStreamingService() stream_result = StreamResult() _t_total = time.perf_counter() + log_system_snapshot("stream_new_chat_START") try: # Mark AI as responding to this user for live collaboration @@ -1382,6 +1378,7 @@ async def stream_new_chat( time.perf_counter() - _t_stream_start, chat_id, ) + log_system_snapshot("stream_new_chat_END") if stream_result.is_interrupted: yield streaming_service.format_finish_step() diff --git a/surfsense_backend/app/utils/perf.py b/surfsense_backend/app/utils/perf.py new file mode 100644 index 000000000..301498048 --- /dev/null +++ b/surfsense_backend/app/utils/perf.py @@ -0,0 +1,122 @@ +""" +Centralized performance monitoring for SurfSense backend. + +Provides: +- A shared [PERF] logger used across all modules +- perf_timer context manager for timing code blocks +- perf_async_timer for async code blocks +- system_snapshot() for CPU/memory profiling +- RequestPerfMiddleware for per-request timing +""" + +import logging +import os +import time +from contextlib import asynccontextmanager, contextmanager +from typing import Any + +_perf_log: logging.Logger | None = None + + +def get_perf_logger() -> logging.Logger: + """Return the singleton [PERF] logger, creating it once on first call.""" + global _perf_log + if _perf_log is None: + _perf_log = logging.getLogger("surfsense.perf") + _perf_log.setLevel(logging.DEBUG) + if not _perf_log.handlers: + h = logging.StreamHandler() + h.setFormatter(logging.Formatter("%(asctime)s [PERF] %(message)s")) + _perf_log.addHandler(h) + _perf_log.propagate = False + return _perf_log + + +@contextmanager +def perf_timer(label: str, *, extra: dict[str, Any] | None = None): + """Synchronous context manager that logs elapsed wall-clock time. + + Usage: + with perf_timer("[my_func] heavy computation"): + ... + """ + log = get_perf_logger() + t0 = time.perf_counter() + yield + elapsed = time.perf_counter() - t0 + suffix = "" + if extra: + suffix = " " + " ".join(f"{k}={v}" for k, v in extra.items()) + log.info("%s in %.3fs%s", label, elapsed, suffix) + + +@asynccontextmanager +async def perf_async_timer(label: str, *, extra: dict[str, Any] | None = None): + """Async context manager that logs elapsed wall-clock time. + + Usage: + async with perf_async_timer("[search] vector search"): + ... + """ + log = get_perf_logger() + t0 = time.perf_counter() + yield + elapsed = time.perf_counter() - t0 + suffix = "" + if extra: + suffix = " " + " ".join(f"{k}={v}" for k, v in extra.items()) + log.info("%s in %.3fs%s", label, elapsed, suffix) + + +def system_snapshot() -> dict[str, Any]: + """Capture a lightweight CPU + memory snapshot of the current process. + + Returns a dict with: + - rss_mb: Resident Set Size in MB + - cpu_percent: CPU usage % since last call (per-process) + - threads: number of active threads + - open_fds: number of open file descriptors (Linux only) + - asyncio_tasks: number of asyncio tasks currently alive + """ + import asyncio + + snapshot: dict[str, Any] = {} + try: + import psutil + + proc = psutil.Process(os.getpid()) + mem = proc.memory_info() + snapshot["rss_mb"] = round(mem.rss / 1024 / 1024, 1) + snapshot["cpu_percent"] = proc.cpu_percent(interval=None) + snapshot["threads"] = proc.num_threads() + try: + snapshot["open_fds"] = proc.num_fds() + except AttributeError: + snapshot["open_fds"] = -1 + except ImportError: + snapshot["rss_mb"] = -1 + snapshot["cpu_percent"] = -1 + snapshot["threads"] = -1 + snapshot["open_fds"] = -1 + + try: + all_tasks = asyncio.all_tasks() + snapshot["asyncio_tasks"] = len(all_tasks) + except RuntimeError: + snapshot["asyncio_tasks"] = -1 + + return snapshot + + +def log_system_snapshot(label: str = "system_snapshot") -> None: + """Capture and log a system snapshot.""" + snap = system_snapshot() + get_perf_logger().info( + "[%s] rss=%.1fMB cpu=%.1f%% threads=%d fds=%d asyncio_tasks=%d", + label, + snap["rss_mb"], + snap["cpu_percent"], + snap["threads"], + snap["open_fds"], + snap["asyncio_tasks"], + ) diff --git a/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts b/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts index a3e8ae272..5deee8360 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/constants/connector-constants.ts @@ -2,27 +2,28 @@ import { EnumConnectorName } from "@/contracts/enums/connector"; // OAuth Connectors (Quick Connect) export const OAUTH_CONNECTORS = [ - { - id: "google-drive-connector", - title: "Google Drive", - description: "Search your Drive files", - connectorType: EnumConnectorName.GOOGLE_DRIVE_CONNECTOR, - authEndpoint: "/api/v1/auth/google/drive/connector/add/", - }, - { - id: "google-gmail-connector", - title: "Gmail", - description: "Search through your emails", - connectorType: EnumConnectorName.GOOGLE_GMAIL_CONNECTOR, - authEndpoint: "/api/v1/auth/google/gmail/connector/add/", - }, - { - id: "google-calendar-connector", - title: "Google Calendar", - description: "Search through your events", - connectorType: EnumConnectorName.GOOGLE_CALENDAR_CONNECTOR, - authEndpoint: "/api/v1/auth/google/calendar/connector/add/", - }, + // // Uncomment for managed Google Connections + // { + // id: "google-drive-connector", + // title: "Google Drive", + // description: "Search your Drive files", + // connectorType: EnumConnectorName.GOOGLE_DRIVE_CONNECTOR, + // authEndpoint: "/api/v1/auth/google/drive/connector/add/", + // }, + // { + // id: "google-gmail-connector", + // title: "Gmail", + // description: "Search through your emails", + // connectorType: EnumConnectorName.GOOGLE_GMAIL_CONNECTOR, + // authEndpoint: "/api/v1/auth/google/gmail/connector/add/", + // }, + // { + // id: "google-calendar-connector", + // title: "Google Calendar", + // description: "Search through your events", + // connectorType: EnumConnectorName.GOOGLE_CALENDAR_CONNECTOR, + // authEndpoint: "/api/v1/auth/google/calendar/connector/add/", + // }, { id: "airtable-connector", title: "Airtable",