diff --git a/surfsense_backend/app/agents/autocomplete/autocomplete_agent.py b/surfsense_backend/app/agents/autocomplete/autocomplete_agent.py deleted file mode 100644 index 890b3e06e..000000000 --- a/surfsense_backend/app/agents/autocomplete/autocomplete_agent.py +++ /dev/null @@ -1,557 +0,0 @@ -"""Vision autocomplete agent with scoped filesystem exploration. - -Converts the stateless single-shot vision autocomplete into an agent that -seeds a virtual filesystem from KB search results and lets the vision LLM -explore documents via ``ls``, ``read_file``, ``glob``, ``grep``, etc. -before generating the final completion. - -Performance: KB search and agent graph compilation run in parallel so -the only sequential latency is KB-search (or agent compile, whichever is -slower) + the agent's LLM turns. There is no separate "query extraction" -LLM call — the window title is used directly as the KB search query. -""" - -from __future__ import annotations - -import asyncio -import json -import logging -import re -import uuid -from collections.abc import AsyncGenerator -from typing import Any - -from deepagents.graph import BASE_AGENT_PROMPT -from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware -from langchain.agents import create_agent -from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware -from langchain_core.language_models import BaseChatModel -from langchain_core.messages import AIMessage, ToolMessage - -from app.agents.new_chat.document_xml import build_document_xml -from app.agents.new_chat.middleware.filesystem import SurfSenseFilesystemMiddleware -from app.agents.new_chat.middleware.knowledge_search import ( - search_knowledge_base, -) -from app.agents.new_chat.path_resolver import ( - DOCUMENTS_ROOT, - build_path_index, - doc_to_virtual_path, -) -from app.db import shielded_async_session -from app.services.new_streaming_service import VercelStreamingService - -try: - from deepagents.backends.utils import create_file_data -except Exception: # pragma: no cover - defensive - - def create_file_data(content: str) -> dict[str, Any]: - return {"content": content.split("\n")} - - -async def _build_autocomplete_filesystem( - *, - documents: Any, - search_space_id: int, -) -> tuple[dict[str, Any], dict[int, str]]: - """Build a ``state['files']``-shaped dict from KB search results. - - This is the autocomplete-specific replacement for the previous - ``build_scoped_filesystem`` helper. It uses the canonical path resolver - so paths line up with the rest of the system, including collision - suffixes for duplicate titles. - """ - files: dict[str, Any] = {} - doc_id_to_path: dict[int, str] = {} - - if not documents: - return files, doc_id_to_path - - async with shielded_async_session() as session: - index = await build_path_index(session, search_space_id) - - for document in documents: - if not isinstance(document, dict): - continue - meta = document.get("document") or {} - doc_id = meta.get("id") - if not isinstance(doc_id, int): - continue - title = str(meta.get("title") or "untitled") - folder_id = meta.get("folder_id") - path = doc_to_virtual_path( - doc_id=doc_id, title=title, folder_id=folder_id, index=index - ) - chunk_ids = document.get("matched_chunk_ids") or [] - try: - matched_set = {int(c) for c in chunk_ids} - except (TypeError, ValueError): - matched_set = set() - xml = build_document_xml(document, matched_chunk_ids=matched_set) - files[path] = create_file_data(xml) - doc_id_to_path[doc_id] = path - - if not files: - # Ensure the synthetic /documents folder is visible even when empty. - files.setdefault(f"{DOCUMENTS_ROOT}/.placeholder", create_file_data("")) - - return files, doc_id_to_path - - -logger = logging.getLogger(__name__) - -KB_TOP_K = 10 - -# --------------------------------------------------------------------------- -# System prompt -# --------------------------------------------------------------------------- - -AUTOCOMPLETE_SYSTEM_PROMPT = """You are a smart writing assistant that analyzes the user's screen to draft or complete text. - -You will receive a screenshot of the user's screen. Your PRIMARY source of truth is the screenshot itself — the visual context determines what to write. - -Your job: -1. Analyze the ENTIRE screenshot to understand what the user is working on (email thread, chat conversation, document, code editor, form, etc.). -2. Identify the text area where the user will type. -3. Generate the text the user most likely wants to write based on the visual context. - -You also have access to the user's knowledge base documents via filesystem tools. However: -- ONLY consult the knowledge base if the screenshot clearly involves a topic where your KB documents are DIRECTLY relevant (e.g., the user is writing about a specific project/topic that matches a document title). -- Do NOT explore documents just because they exist. Most autocomplete requests can be answered purely from the screenshot. -- If you do read a document, only incorporate information that is 100% relevant to what the user is typing RIGHT NOW. Do not add extra details, background, or tangential information from the KB. -- Keep your output SHORT — autocomplete should feel like a natural continuation, not an essay. - -Key behavior: -- If the text area is EMPTY, draft a concise response or message based on what you see on screen (e.g., reply to an email, respond to a chat message, continue a document). -- If the text area already has text, continue it naturally — typically just a sentence or two. - -Rules: -- Be CONCISE. Prefer a single paragraph or a few sentences. Autocomplete is a quick assist, not a full draft. -- Match the tone and formality of the surrounding context. -- If the screen shows code, write code. If it shows a casual chat, be casual. If it shows a formal email, be formal. -- Do NOT describe the screenshot or explain your reasoning. -- Do NOT cite or reference documents explicitly — just let the knowledge inform your writing naturally. -- If you cannot determine what to write, output an empty JSON array: [] - -## Output Format - -You MUST provide exactly 3 different suggestion options. Each should be a distinct, plausible completion — vary the tone, detail level, or angle. - -Return your suggestions as a JSON array of exactly 3 strings. Output ONLY the JSON array, nothing else — no markdown fences, no explanation, no commentary. - -Example format: -["First suggestion text here.", "Second suggestion — a different take.", "Third option with another approach."] - -## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file`, `glob`, `grep` - -All file paths must start with a `/`. -- ls: list files and directories at a given path. -- read_file: read a file from the filesystem. -- write_file: create a temporary file in the session (not persisted). -- edit_file: edit a file in the session (not persisted for /documents/ files). -- glob: find files matching a pattern (e.g., "**/*.xml"). -- grep: search for text within files. - -## When to Use Filesystem Tools - -BEFORE reaching for any tool, ask yourself: "Can I write a good completion purely from the screenshot?" If yes, just write it — do NOT explore the KB. - -Only use tools when: -- The user is clearly writing about a specific topic that likely has detailed information in their KB. -- You need a specific fact, name, number, or reference that the screenshot doesn't provide. - -When you do use tools, be surgical: -- Check the `ls` output first. If no document title looks relevant, stop — do not read files just to see what's there. -- If a title looks relevant, read only the `` (first ~20 lines) and jump to matched chunks. Do not read entire documents. -- Extract only the specific information you need and move on to generating the completion. - -## Reading Documents Efficiently - -Documents are formatted as XML. Each document contains: -- `` — title, type, URL, etc. -- `` — a table of every chunk with its **line range** and a - `matched="true"` flag for chunks that matched the search query. -- `` — the actual chunks in original document order. - -**Workflow**: read the first ~20 lines to see the ``, identify -chunks marked `matched="true"`, then use `read_file(path, offset=, -limit=)` to jump directly to those sections.""" - -APP_CONTEXT_BLOCK = """ - -The user is currently working in "{app_name}" (window: "{window_title}"). Use this to understand the type of application and adapt your tone and format accordingly.""" - - -def _build_autocomplete_system_prompt(app_name: str, window_title: str) -> str: - prompt = AUTOCOMPLETE_SYSTEM_PROMPT - if app_name: - prompt += APP_CONTEXT_BLOCK.format(app_name=app_name, window_title=window_title) - return prompt - - -# --------------------------------------------------------------------------- -# Pre-compute KB filesystem (runs in parallel with agent compilation) -# --------------------------------------------------------------------------- - - -class _KBResult: - """Container for pre-computed KB filesystem results.""" - - __slots__ = ("files", "ls_ai_msg", "ls_tool_msg") - - def __init__( - self, - files: dict[str, Any] | None = None, - ls_ai_msg: AIMessage | None = None, - ls_tool_msg: ToolMessage | None = None, - ) -> None: - self.files = files - self.ls_ai_msg = ls_ai_msg - self.ls_tool_msg = ls_tool_msg - - @property - def has_documents(self) -> bool: - return bool(self.files) - - -async def precompute_kb_filesystem( - search_space_id: int, - query: str, - top_k: int = KB_TOP_K, -) -> _KBResult: - """Search the KB and build the scoped filesystem outside the agent. - - This is designed to be called via ``asyncio.gather`` alongside agent - graph compilation so the two run concurrently. - """ - if not query: - return _KBResult() - - try: - search_results = await search_knowledge_base( - query=query, - search_space_id=search_space_id, - top_k=top_k, - ) - - if not search_results: - return _KBResult() - - new_files, _ = await _build_autocomplete_filesystem( - documents=search_results, - search_space_id=search_space_id, - ) - - if not new_files: - return _KBResult() - - doc_paths = [ - p - for p, v in new_files.items() - if p.startswith("/documents/") and v is not None - ] - tool_call_id = f"auto_ls_{uuid.uuid4().hex[:12]}" - ai_msg = AIMessage( - content="", - tool_calls=[ - {"name": "ls", "args": {"path": "/documents"}, "id": tool_call_id} - ], - ) - tool_msg = ToolMessage( - content=str(doc_paths) if doc_paths else "No documents found.", - tool_call_id=tool_call_id, - ) - return _KBResult(files=new_files, ls_ai_msg=ai_msg, ls_tool_msg=tool_msg) - - except Exception: - logger.warning( - "KB pre-computation failed, proceeding without KB", exc_info=True - ) - return _KBResult() - - -# --------------------------------------------------------------------------- -# Filesystem middleware — no save_document, no persistence -# --------------------------------------------------------------------------- - - -class AutocompleteFilesystemMiddleware(SurfSenseFilesystemMiddleware): - """Filesystem middleware for autocomplete — read-only exploration only. - - Passes ``search_space_id=None`` so the new persistence pipeline is - bypassed; the autocomplete flow only reads, never commits to Postgres. - """ - - def __init__(self) -> None: - super().__init__(search_space_id=None, created_by_id=None) - - -# --------------------------------------------------------------------------- -# Agent factory -# --------------------------------------------------------------------------- - - -async def _compile_agent( - llm: BaseChatModel, - app_name: str, - window_title: str, -) -> Any: - """Compile the agent graph (CPU-bound, runs in a thread).""" - system_prompt = _build_autocomplete_system_prompt(app_name, window_title) - final_system_prompt = system_prompt + "\n\n" + BASE_AGENT_PROMPT - - middleware = [ - AutocompleteFilesystemMiddleware(), - PatchToolCallsMiddleware(), - AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"), - ] - - agent = await asyncio.to_thread( - create_agent, - llm, - system_prompt=final_system_prompt, - tools=[], - middleware=middleware, - ) - return agent.with_config({"recursion_limit": 200}) - - -async def create_autocomplete_agent( - llm: BaseChatModel, - *, - search_space_id: int, - kb_query: str, - app_name: str = "", - window_title: str = "", -) -> tuple[Any, _KBResult]: - """Create the autocomplete agent and pre-compute KB in parallel. - - Returns ``(agent, kb_result)`` so the caller can inject the pre-computed - filesystem into the agent's initial state without any middleware delay. - """ - agent, kb = await asyncio.gather( - _compile_agent(llm, app_name, window_title), - precompute_kb_filesystem(search_space_id, kb_query), - ) - return agent, kb - - -# --------------------------------------------------------------------------- -# JSON suggestion parsing (with fallback) -# --------------------------------------------------------------------------- - - -def _parse_suggestions(raw: str) -> list[str]: - """Extract a list of suggestion strings from the agent's output. - - Tries, in order: - 1. Direct ``json.loads`` - 2. Extract content between ```json ... ``` fences - 3. Find the first ``[`` … ``]`` span - Falls back to wrapping the raw text as a single suggestion. - """ - text = raw.strip() - if not text: - return [] - - for candidate in _json_candidates(text): - try: - parsed = json.loads(candidate) - if isinstance(parsed, list) and all(isinstance(s, str) for s in parsed): - return [s for s in parsed if s.strip()] - except (json.JSONDecodeError, ValueError): - continue - - return [text] - - -def _json_candidates(text: str) -> list[str]: - """Yield candidate JSON strings from raw text.""" - candidates = [text] - - fence = re.search(r"```(?:json)?\s*\n?(.*?)```", text, re.DOTALL) - if fence: - candidates.append(fence.group(1).strip()) - - bracket = re.search(r"\[.*]", text, re.DOTALL) - if bracket: - candidates.append(bracket.group(0)) - - return candidates - - -# --------------------------------------------------------------------------- -# Streaming helper -# --------------------------------------------------------------------------- - - -async def stream_autocomplete_agent( - agent: Any, - input_data: dict[str, Any], - streaming_service: VercelStreamingService, - *, - emit_message_start: bool = True, -) -> AsyncGenerator[str, None]: - """Stream agent events as Vercel SSE, with thinking steps for tool calls. - - When ``emit_message_start`` is False the caller has already sent the - ``message_start`` event (e.g. to show preparation steps before the agent - runs). - """ - thread_id = uuid.uuid4().hex - config = {"configurable": {"thread_id": thread_id}} - - text_buffer: list[str] = [] - active_tool_depth = 0 - thinking_step_counter = 0 - tool_step_ids: dict[str, str] = {} - step_titles: dict[str, str] = {} - completed_step_ids: set[str] = set() - last_active_step_id: str | None = None - - def next_thinking_step_id() -> str: - nonlocal thinking_step_counter - thinking_step_counter += 1 - return f"autocomplete-step-{thinking_step_counter}" - - def complete_current_step() -> str | None: - nonlocal last_active_step_id - if last_active_step_id and last_active_step_id not in completed_step_ids: - completed_step_ids.add(last_active_step_id) - title = step_titles.get(last_active_step_id, "Done") - event = streaming_service.format_thinking_step( - step_id=last_active_step_id, - title=title, - status="complete", - ) - last_active_step_id = None - return event - return None - - if emit_message_start: - yield streaming_service.format_message_start() - - gen_step_id = next_thinking_step_id() - last_active_step_id = gen_step_id - step_titles[gen_step_id] = "Generating suggestions" - yield streaming_service.format_thinking_step( - step_id=gen_step_id, - title="Generating suggestions", - status="in_progress", - ) - - try: - async for event in agent.astream_events( - input_data, config=config, version="v2" - ): - event_type = event.get("event", "") - if event_type == "on_chat_model_stream": - if active_tool_depth > 0: - continue - if "surfsense:internal" in event.get("tags", []): - continue - chunk = event.get("data", {}).get("chunk") - if chunk and hasattr(chunk, "content"): - content = chunk.content - if content and isinstance(content, str): - text_buffer.append(content) - - elif event_type == "on_chat_model_end": - if active_tool_depth > 0: - continue - if "surfsense:internal" in event.get("tags", []): - continue - output = event.get("data", {}).get("output") - if output and hasattr(output, "content"): - if getattr(output, "tool_calls", None): - continue - content = output.content - if content and isinstance(content, str) and not text_buffer: - text_buffer.append(content) - - elif event_type == "on_tool_start": - active_tool_depth += 1 - tool_name = event.get("name", "unknown_tool") - run_id = event.get("run_id", "") - tool_input = event.get("data", {}).get("input", {}) - - step_event = complete_current_step() - if step_event: - yield step_event - - tool_step_id = next_thinking_step_id() - tool_step_ids[run_id] = tool_step_id - last_active_step_id = tool_step_id - - title, items = _describe_tool_call(tool_name, tool_input) - step_titles[tool_step_id] = title - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title=title, - status="in_progress", - items=items, - ) - - elif event_type == "on_tool_end": - active_tool_depth = max(0, active_tool_depth - 1) - run_id = event.get("run_id", "") - step_id = tool_step_ids.pop(run_id, None) - if step_id and step_id not in completed_step_ids: - completed_step_ids.add(step_id) - title = step_titles.get(step_id, "Done") - yield streaming_service.format_thinking_step( - step_id=step_id, - title=title, - status="complete", - ) - if last_active_step_id == step_id: - last_active_step_id = None - - step_event = complete_current_step() - if step_event: - yield step_event - - raw_text = "".join(text_buffer) - suggestions = _parse_suggestions(raw_text) - - yield streaming_service.format_data("suggestions", {"options": suggestions}) - - yield streaming_service.format_finish() - yield streaming_service.format_done() - - except Exception as e: - logger.error(f"Autocomplete agent streaming error: {e}", exc_info=True) - yield streaming_service.format_error("Autocomplete failed. Please try again.") - yield streaming_service.format_done() - - -def _describe_tool_call(tool_name: str, tool_input: Any) -> tuple[str, list[str]]: - """Return a human-readable (title, items) for a tool call thinking step.""" - inp = tool_input if isinstance(tool_input, dict) else {} - if tool_name == "ls": - path = inp.get("path", "/") - return "Listing files", [path] - if tool_name == "read_file": - fp = inp.get("file_path", "") - display = fp if len(fp) <= 80 else "…" + fp[-77:] - return "Reading file", [display] - if tool_name == "write_file": - fp = inp.get("file_path", "") - display = fp if len(fp) <= 80 else "…" + fp[-77:] - return "Writing file", [display] - if tool_name == "edit_file": - fp = inp.get("file_path", "") - display = fp if len(fp) <= 80 else "…" + fp[-77:] - return "Editing file", [display] - if tool_name == "glob": - pat = inp.get("pattern", "") - base = inp.get("path", "/") - return "Searching files", [f"{pat} in {base}"] - if tool_name == "grep": - pat = inp.get("pattern", "") - path = inp.get("path", "") - display_pat = pat[:60] + ("…" if len(pat) > 60 else "") - return "Searching content", [ - f'"{display_pat}"' + (f" in {path}" if path else "") - ] - return f"Using {tool_name}", [] diff --git a/surfsense_backend/app/agents/multi_agent_chat/main_agent/system_prompt/builder/provider_hints.py b/surfsense_backend/app/agents/multi_agent_chat/main_agent/system_prompt/builder/provider_hints.py deleted file mode 100644 index 78d7b08ec..000000000 --- a/surfsense_backend/app/agents/multi_agent_chat/main_agent/system_prompt/builder/provider_hints.py +++ /dev/null @@ -1,50 +0,0 @@ -"""Provider-specific style hints from ``prompts/providers/`` (main agent only).""" - -from __future__ import annotations - -import re - -from .load_md import read_prompt_md - -ProviderVariant = str - -_OPENAI_CODEX_RE = re.compile( - r"\b(gpt-codex|codex-mini|gpt-[\d.]+-codex)\b", re.IGNORECASE -) -_OPENAI_REASONING_RE = re.compile(r"\b(gpt-5|o\d|o-)", re.IGNORECASE) -_OPENAI_CLASSIC_RE = re.compile(r"\bgpt-4", re.IGNORECASE) -_ANTHROPIC_RE = re.compile(r"\bclaude\b", re.IGNORECASE) -_GOOGLE_RE = re.compile(r"\bgemini\b", re.IGNORECASE) -_KIMI_RE = re.compile(r"\b(kimi[-\d.]*|moonshot)\b", re.IGNORECASE) -_GROK_RE = re.compile(r"\bgrok\b", re.IGNORECASE) -_DEEPSEEK_RE = re.compile(r"\bdeepseek\b", re.IGNORECASE) - - -def detect_provider_variant(model_name: str | None) -> ProviderVariant: - if not model_name: - return "default" - name = model_name.strip() - if _OPENAI_CODEX_RE.search(name): - return "openai_codex" - if _OPENAI_REASONING_RE.search(name): - return "openai_reasoning" - if _OPENAI_CLASSIC_RE.search(name): - return "openai_classic" - if _ANTHROPIC_RE.search(name): - return "anthropic" - if _GOOGLE_RE.search(name): - return "google" - if _KIMI_RE.search(name): - return "kimi" - if _GROK_RE.search(name): - return "grok" - if _DEEPSEEK_RE.search(name): - return "deepseek" - return "default" - - -def build_provider_hint_block(provider_variant: ProviderVariant) -> str: - if not provider_variant or provider_variant == "default": - return "" - text = read_prompt_md(f"providers/{provider_variant}.md") - return f"\n{text}\n" if text else "" diff --git a/surfsense_backend/app/agents/multi_agent_chat/main_agent/system_prompt/builder/sections/provider.py b/surfsense_backend/app/agents/multi_agent_chat/main_agent/system_prompt/builder/sections/provider.py deleted file mode 100644 index 7de722080..000000000 --- a/surfsense_backend/app/agents/multi_agent_chat/main_agent/system_prompt/builder/sections/provider.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Provider-specific style hints.""" - -from __future__ import annotations - -from ..provider_hints import build_provider_hint_block, detect_provider_variant - - -def build_provider_section(*, model_name: str | None) -> str: - return build_provider_hint_block(detect_provider_variant(model_name)) diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/__init__.py b/surfsense_backend/app/agents/new_chat/tools/linear/__init__.py deleted file mode 100644 index 31acf1e2a..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/linear/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Linear tools for creating, updating, and deleting issues.""" - -from .create_issue import create_create_linear_issue_tool -from .delete_issue import create_delete_linear_issue_tool -from .update_issue import create_update_linear_issue_tool - -__all__ = [ - "create_create_linear_issue_tool", - "create_delete_linear_issue_tool", - "create_update_linear_issue_tool", -] diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py b/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py deleted file mode 100644 index f897bee7a..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/linear/create_issue.py +++ /dev/null @@ -1,266 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.new_chat.tools.hitl import request_approval -from app.connectors.linear_connector import LinearAPIError, LinearConnector -from app.db import async_session_maker -from app.services.linear import LinearToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_create_linear_issue_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """Factory function to create the create_linear_issue tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker`. This is critical for the compiled-agent - cache: the compiled graph (and therefore this closure) is reused - across HTTP requests, so capturing a per-request session here would - surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Linear connector - user_id: User ID for fetching user-specific context - connector_id: Optional specific connector ID (if known) - - Returns: - Configured create_linear_issue tool - """ - del db_session # per-call session — see docstring - - @tool - async def create_linear_issue( - title: str, - description: str | None = None, - ) -> dict[str, Any]: - """Create a new issue in Linear. - - Use this tool when the user explicitly asks to create, add, or file - a new issue / ticket / task in Linear. The user MUST describe the issue - before you call this tool. If the request is vague, ask what the issue - should be about. Never call this tool without a clear topic from the user. - - Args: - title: Short, descriptive issue title. Infer from the user's request. - description: Optional markdown body for the issue. Generate from context. - - Returns: - Dictionary with: - - status: "success", "rejected", or "error" - - issue_id: Linear issue UUID (if success) - - identifier: Human-readable ID like "ENG-42" (if success) - - url: URL to the created issue (if success) - - message: Result message - - IMPORTANT: If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment (e.g., "Understood, I won't create the issue.") - and move on. Do NOT retry, troubleshoot, or suggest alternatives. - - Examples: - - "Create a Linear issue for the login bug" - - "File a ticket about the payment timeout problem" - - "Add an issue for the broken search feature" - """ - logger.info(f"create_linear_issue called: title='{title}'") - - if search_space_id is None or user_id is None: - logger.error( - "Linear tool not properly configured - missing required parameters" - ) - return { - "status": "error", - "message": "Linear tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = LinearToolMetadataService(db_session) - context = await metadata_service.get_creation_context( - search_space_id, user_id - ) - - if "error" in context: - logger.error( - f"Failed to fetch creation context: {context['error']}" - ) - return {"status": "error", "message": context["error"]} - - workspaces = context.get("workspaces", []) - if workspaces and all(w.get("auth_expired") for w in workspaces): - logger.warning("All Linear accounts have expired authentication") - return { - "status": "auth_error", - "message": "All connected Linear accounts need re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "linear", - } - - logger.info(f"Requesting approval for creating Linear issue: '{title}'") - result = request_approval( - action_type="linear_issue_creation", - tool_name="create_linear_issue", - params={ - "title": title, - "description": description, - "team_id": None, - "state_id": None, - "assignee_id": None, - "priority": None, - "label_ids": [], - "connector_id": connector_id, - }, - context=context, - ) - - if result.rejected: - logger.info("Linear issue creation rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_title = result.params.get("title", title) - final_description = result.params.get("description", description) - final_team_id = result.params.get("team_id") - final_state_id = result.params.get("state_id") - final_assignee_id = result.params.get("assignee_id") - final_priority = result.params.get("priority") - final_label_ids = result.params.get("label_ids") or [] - final_connector_id = result.params.get("connector_id", connector_id) - - if not final_title or not final_title.strip(): - logger.error("Title is empty or contains only whitespace") - return { - "status": "error", - "message": "Issue title cannot be empty.", - } - if not final_team_id: - return { - "status": "error", - "message": "A team must be selected to create an issue.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - actual_connector_id = final_connector_id - if actual_connector_id is None: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.LINEAR_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Linear connector found. Please connect Linear in your workspace settings.", - } - actual_connector_id = connector.id - logger.info(f"Found Linear connector: id={actual_connector_id}") - else: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == actual_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.LINEAR_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Linear connector is invalid or has been disconnected.", - } - logger.info(f"Validated Linear connector: id={actual_connector_id}") - - logger.info( - f"Creating Linear issue with final params: title='{final_title}'" - ) - linear_client = LinearConnector( - session=db_session, connector_id=actual_connector_id - ) - result = await linear_client.create_issue( - team_id=final_team_id, - title=final_title, - description=final_description, - state_id=final_state_id, - assignee_id=final_assignee_id, - priority=final_priority, - label_ids=final_label_ids if final_label_ids else None, - ) - - if result.get("status") == "error": - logger.error( - f"Failed to create Linear issue: {result.get('message')}" - ) - return {"status": "error", "message": result.get("message")} - - logger.info( - f"Linear issue created: {result.get('identifier')} - {result.get('title')}" - ) - - kb_message_suffix = "" - try: - from app.services.linear import LinearKBSyncService - - kb_service = LinearKBSyncService(db_session) - kb_result = await kb_service.sync_after_create( - issue_id=result.get("id"), - issue_identifier=result.get("identifier", ""), - issue_title=result.get("title", final_title), - issue_url=result.get("url"), - description=final_description, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after create failed: {kb_err}") - kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." - - return { - "status": "success", - "issue_id": result.get("id"), - "identifier": result.get("identifier"), - "url": result.get("url"), - "message": (result.get("message", "") + kb_message_suffix), - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error creating Linear issue: {e}", exc_info=True) - if isinstance(e, ValueError | LinearAPIError): - message = str(e) - else: - message = ( - "Something went wrong while creating the issue. Please try again." - ) - return {"status": "error", "message": message} - - return create_linear_issue diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/delete_issue.py b/surfsense_backend/app/agents/new_chat/tools/linear/delete_issue.py deleted file mode 100644 index c5039a8eb..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/linear/delete_issue.py +++ /dev/null @@ -1,256 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.new_chat.tools.hitl import request_approval -from app.connectors.linear_connector import LinearAPIError, LinearConnector -from app.db import async_session_maker -from app.services.linear import LinearToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_delete_linear_issue_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """Factory function to create the delete_linear_issue tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker`. This is critical for the compiled-agent - cache: the compiled graph (and therefore this closure) is reused - across HTTP requests, so capturing a per-request session here would - surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Linear connector - user_id: User ID for finding the correct Linear connector - connector_id: Optional specific connector ID (if known) - - Returns: - Configured delete_linear_issue tool - """ - del db_session # per-call session — see docstring - - @tool - async def delete_linear_issue( - issue_ref: str, - delete_from_kb: bool = False, - ) -> dict[str, Any]: - """Archive (delete) a Linear issue. - - Use this tool when the user asks to delete, remove, or archive a Linear issue. - Note that Linear archives issues rather than permanently deleting them - (they can be restored from the archive). - - - Args: - issue_ref: The issue to delete. Can be the issue title (e.g. "Fix login bug"), - the identifier (e.g. "ENG-42"), or the full document title - (e.g. "ENG-42: Fix login bug"). - delete_from_kb: Whether to also remove the issue from the knowledge base. - Default is False. Set to True to remove from both Linear - and the knowledge base. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - identifier: Human-readable ID like "ENG-42" (if success) - - message: Success or error message - - deleted_from_kb: Whether the issue was also removed from the knowledge base (if success) - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment (e.g., "Understood, I won't delete the issue.") - and move on. Do NOT ask for alternatives or troubleshoot. - - If status is "not_found", inform the user conversationally using the exact message - provided. Do NOT treat this as an error. Simply relay the message and ask the user - to verify the issue title or identifier, or check if it has been indexed. - Examples: - - "Delete the 'Fix login bug' Linear issue" - - "Archive ENG-42" - - "Remove the 'Old payment flow' issue from Linear" - """ - logger.info( - f"delete_linear_issue called: issue_ref='{issue_ref}', delete_from_kb={delete_from_kb}" - ) - - if search_space_id is None or user_id is None: - logger.error( - "Linear tool not properly configured - missing required parameters" - ) - return { - "status": "error", - "message": "Linear tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = LinearToolMetadataService(db_session) - context = await metadata_service.get_delete_context( - search_space_id, user_id, issue_ref - ) - - if "error" in context: - error_msg = context["error"] - if context.get("auth_expired"): - logger.warning(f"Auth expired for delete context: {error_msg}") - return { - "status": "auth_error", - "message": error_msg, - "connector_id": context.get("connector_id"), - "connector_type": "linear", - } - if "not found" in error_msg.lower(): - logger.warning(f"Issue not found: {error_msg}") - return {"status": "not_found", "message": error_msg} - else: - logger.error(f"Failed to fetch delete context: {error_msg}") - return {"status": "error", "message": error_msg} - - issue_id = context["issue"]["id"] - issue_identifier = context["issue"].get("identifier", "") - document_id = context["issue"]["document_id"] - connector_id_from_context = context.get("workspace", {}).get("id") - - logger.info( - f"Requesting approval for deleting Linear issue: '{issue_ref}' " - f"(id={issue_id}, delete_from_kb={delete_from_kb})" - ) - result = request_approval( - action_type="linear_issue_deletion", - tool_name="delete_linear_issue", - params={ - "issue_id": issue_id, - "connector_id": connector_id_from_context, - "delete_from_kb": delete_from_kb, - }, - context=context, - ) - - if result.rejected: - logger.info("Linear issue deletion rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_issue_id = result.params.get("issue_id", issue_id) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_delete_from_kb = result.params.get( - "delete_from_kb", delete_from_kb - ) - - logger.info( - f"Deleting Linear issue with final params: issue_id={final_issue_id}, " - f"connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}" - ) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - if final_connector_id: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.LINEAR_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - logger.error( - f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" - ) - return { - "status": "error", - "message": "Selected Linear connector is invalid or has been disconnected.", - } - actual_connector_id = connector.id - logger.info(f"Validated Linear connector: id={actual_connector_id}") - else: - logger.error("No connector found for this issue") - return { - "status": "error", - "message": "No connector found for this issue.", - } - - linear_client = LinearConnector( - session=db_session, connector_id=actual_connector_id - ) - - result = await linear_client.archive_issue(issue_id=final_issue_id) - - logger.info( - f"archive_issue result: {result.get('status')} - {result.get('message', '')}" - ) - - deleted_from_kb = False - if ( - result.get("status") == "success" - and final_delete_from_kb - and document_id - ): - try: - from app.db import Document - - doc_result = await db_session.execute( - select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - if document: - await db_session.delete(document) - await db_session.commit() - deleted_from_kb = True - logger.info( - f"Deleted document {document_id} from knowledge base" - ) - else: - logger.warning(f"Document {document_id} not found in KB") - except Exception as e: - logger.error(f"Failed to delete document from KB: {e}") - await db_session.rollback() - result["warning"] = ( - f"Issue archived in Linear, but failed to remove from knowledge base: {e!s}" - ) - - if result.get("status") == "success": - result["deleted_from_kb"] = deleted_from_kb - if issue_identifier: - result["message"] = ( - f"Issue {issue_identifier} archived successfully." - ) - if deleted_from_kb: - result["message"] = ( - f"{result.get('message', '')} Also removed from the knowledge base." - ) - - return result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error deleting Linear issue: {e}", exc_info=True) - if isinstance(e, ValueError | LinearAPIError): - message = str(e) - else: - message = ( - "Something went wrong while deleting the issue. Please try again." - ) - return {"status": "error", "message": message} - - return delete_linear_issue diff --git a/surfsense_backend/app/agents/new_chat/tools/linear/update_issue.py b/surfsense_backend/app/agents/new_chat/tools/linear/update_issue.py deleted file mode 100644 index d610ce2b7..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/linear/update_issue.py +++ /dev/null @@ -1,327 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.new_chat.tools.hitl import request_approval -from app.connectors.linear_connector import LinearAPIError, LinearConnector -from app.db import async_session_maker -from app.services.linear import LinearKBSyncService, LinearToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_update_linear_issue_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """Factory function to create the update_linear_issue tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker`. This is critical for the compiled-agent - cache: the compiled graph (and therefore this closure) is reused - across HTTP requests, so capturing a per-request session here would - surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Linear connector - user_id: User ID for fetching user-specific context - connector_id: Optional specific connector ID (if known) - - Returns: - Configured update_linear_issue tool - """ - del db_session # per-call session — see docstring - - @tool - async def update_linear_issue( - issue_ref: str, - new_title: str | None = None, - new_description: str | None = None, - new_state_name: str | None = None, - new_assignee_email: str | None = None, - new_priority: int | None = None, - new_label_names: list[str] | None = None, - ) -> dict[str, Any]: - """Update an existing Linear issue that has been indexed in the knowledge base. - - Use this tool when the user asks to modify, change, or update a Linear issue — - for example, changing its status, reassigning it, updating its title or description, - adjusting its priority, or changing its labels. - - Only issues already indexed in the knowledge base can be updated. - - Args: - issue_ref: The issue to update. Can be the issue title (e.g. "Fix login bug"), - the identifier (e.g. "ENG-42"), or the full document title - (e.g. "ENG-42: Fix login bug"). Matched case-insensitively. - new_title: New title for the issue (optional). - new_description: New markdown body for the issue (optional). - new_state_name: New workflow state name (e.g. "In Progress", "Done"). - Matched case-insensitively against the team's states. - new_assignee_email: Email address of the new assignee. - Matched case-insensitively against the team's members. - new_priority: New priority (0 = No Priority, 1 = Urgent, 2 = High, - 3 = Medium, 4 = Low). - new_label_names: New set of label names to apply. - Matched case-insensitively against the team's labels. - Unrecognised names are silently skipped. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - identifier: Human-readable ID like "ENG-42" (if success) - - url: URL to the updated issue (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment (e.g., "Understood, I didn't update the issue.") - and move on. Do NOT ask for alternatives or troubleshoot. - - If status is "not_found", inform the user conversationally using the exact message - provided. Do NOT treat this as an error. Simply relay the message and ask the user - to verify the issue title or identifier, or check if it has been indexed. - - Examples: - - "Mark the 'Fix login bug' issue as done" - - "Assign ENG-42 to john@company.com" - - "Change the priority of 'Payment timeout' to urgent" - """ - logger.info(f"update_linear_issue called: issue_ref='{issue_ref}'") - - if search_space_id is None or user_id is None: - logger.error( - "Linear tool not properly configured - missing required parameters" - ) - return { - "status": "error", - "message": "Linear tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = LinearToolMetadataService(db_session) - context = await metadata_service.get_update_context( - search_space_id, user_id, issue_ref - ) - - if "error" in context: - error_msg = context["error"] - if context.get("auth_expired"): - logger.warning(f"Auth expired for update context: {error_msg}") - return { - "status": "auth_error", - "message": error_msg, - "connector_id": context.get("connector_id"), - "connector_type": "linear", - } - if "not found" in error_msg.lower(): - logger.warning(f"Issue not found: {error_msg}") - return {"status": "not_found", "message": error_msg} - else: - logger.error(f"Failed to fetch update context: {error_msg}") - return {"status": "error", "message": error_msg} - - issue_id = context["issue"]["id"] - document_id = context["issue"]["document_id"] - connector_id_from_context = context.get("workspace", {}).get("id") - - team = context.get("team", {}) - new_state_id = _resolve_state(team, new_state_name) - new_assignee_id = _resolve_assignee(team, new_assignee_email) - new_label_ids = _resolve_labels(team, new_label_names) - - logger.info( - f"Requesting approval for updating Linear issue: '{issue_ref}' (id={issue_id})" - ) - result = request_approval( - action_type="linear_issue_update", - tool_name="update_linear_issue", - params={ - "issue_id": issue_id, - "document_id": document_id, - "new_title": new_title, - "new_description": new_description, - "new_state_id": new_state_id, - "new_assignee_id": new_assignee_id, - "new_priority": new_priority, - "new_label_ids": new_label_ids, - "connector_id": connector_id_from_context, - }, - context=context, - ) - - if result.rejected: - logger.info("Linear issue update rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_issue_id = result.params.get("issue_id", issue_id) - final_document_id = result.params.get("document_id", document_id) - final_new_title = result.params.get("new_title", new_title) - final_new_description = result.params.get( - "new_description", new_description - ) - final_new_state_id = result.params.get("new_state_id", new_state_id) - final_new_assignee_id = result.params.get( - "new_assignee_id", new_assignee_id - ) - final_new_priority = result.params.get("new_priority", new_priority) - final_new_label_ids: list[str] | None = result.params.get( - "new_label_ids", new_label_ids - ) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - - if not final_connector_id: - logger.error("No connector found for this issue") - return { - "status": "error", - "message": "No connector found for this issue.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.LINEAR_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - logger.error( - f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" - ) - return { - "status": "error", - "message": "Selected Linear connector is invalid or has been disconnected.", - } - logger.info(f"Validated Linear connector: id={final_connector_id}") - - logger.info( - f"Updating Linear issue with final params: issue_id={final_issue_id}" - ) - linear_client = LinearConnector( - session=db_session, connector_id=final_connector_id - ) - updated_issue = await linear_client.update_issue( - issue_id=final_issue_id, - title=final_new_title, - description=final_new_description, - state_id=final_new_state_id, - assignee_id=final_new_assignee_id, - priority=final_new_priority, - label_ids=final_new_label_ids, - ) - - if updated_issue.get("status") == "error": - logger.error( - f"Failed to update Linear issue: {updated_issue.get('message')}" - ) - return { - "status": "error", - "message": updated_issue.get("message"), - } - - logger.info( - f"update_issue result: {updated_issue.get('identifier')} - {updated_issue.get('title')}" - ) - - if final_document_id is not None: - logger.info( - f"Updating knowledge base for document {final_document_id}..." - ) - kb_service = LinearKBSyncService(db_session) - kb_result = await kb_service.sync_after_update( - document_id=final_document_id, - issue_id=final_issue_id, - user_id=user_id, - search_space_id=search_space_id, - ) - if kb_result["status"] == "success": - logger.info( - f"Knowledge base successfully updated for issue {final_issue_id}" - ) - kb_message = " Your knowledge base has also been updated." - elif kb_result["status"] == "not_indexed": - kb_message = " This issue will be added to your knowledge base in the next scheduled sync." - else: - logger.warning( - f"KB update failed for issue {final_issue_id}: {kb_result.get('message')}" - ) - kb_message = " Your knowledge base will be updated in the next scheduled sync." - else: - kb_message = "" - - identifier = updated_issue.get("identifier") - default_msg = f"Issue {identifier} updated successfully." - return { - "status": "success", - "identifier": identifier, - "url": updated_issue.get("url"), - "message": f"{updated_issue.get('message', default_msg)}{kb_message}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error updating Linear issue: {e}", exc_info=True) - if isinstance(e, ValueError | LinearAPIError): - message = str(e) - else: - message = ( - "Something went wrong while updating the issue. Please try again." - ) - return {"status": "error", "message": message} - - return update_linear_issue - - -def _resolve_state(team: dict, state_name: str | None) -> str | None: - if not state_name: - return None - name_lower = state_name.lower() - for state in team.get("states", []): - if state.get("name", "").lower() == name_lower: - return state["id"] - return None - - -def _resolve_assignee(team: dict, assignee_email: str | None) -> str | None: - if not assignee_email: - return None - email_lower = assignee_email.lower() - for member in team.get("members", []): - if member.get("email", "").lower() == email_lower: - return member["id"] - return None - - -def _resolve_labels(team: dict, label_names: list[str] | None) -> list[str] | None: - if label_names is None: - return None - if not label_names: - return [] - name_set = {n.lower() for n in label_names} - return [ - label["id"] - for label in team.get("labels", []) - if label.get("name", "").lower() in name_set - ] diff --git a/surfsense_backend/app/agents/new_chat/tools/tool_response.py b/surfsense_backend/app/agents/new_chat/tools/tool_response.py deleted file mode 100644 index 8644ada5c..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/tool_response.py +++ /dev/null @@ -1,38 +0,0 @@ -"""Standardised response dict factories for LangChain agent tools.""" - -from __future__ import annotations - -from typing import Any - - -class ToolResponse: - @staticmethod - def success(message: str, **data: Any) -> dict[str, Any]: - return {"status": "success", "message": message, **data} - - @staticmethod - def error(error: str, **data: Any) -> dict[str, Any]: - return {"status": "error", "error": error, **data} - - @staticmethod - def auth_error(service: str, **data: Any) -> dict[str, Any]: - return { - "status": "auth_error", - "error": ( - f"{service} authentication has expired or been revoked. " - "Please re-connect the integration in Settings → Connectors." - ), - **data, - } - - @staticmethod - def rejected(message: str = "Action was declined by the user.") -> dict[str, Any]: - return {"status": "rejected", "message": message} - - @staticmethod - def not_found(resource: str, identifier: str, **data: Any) -> dict[str, Any]: - return { - "status": "not_found", - "error": f"{resource} '{identifier}' was not found.", - **data, - }