mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-06 20:15:17 +02:00
chore: remove dead agent code (vision autocomplete, linear tools, provider hints)
This commit is contained in:
parent
fb70e23dd2
commit
0b006badb0
8 changed files with 0 additions and 1514 deletions
|
|
@ -1,557 +0,0 @@
|
|||
"""Vision autocomplete agent with scoped filesystem exploration.
|
||||
|
||||
Converts the stateless single-shot vision autocomplete into an agent that
|
||||
seeds a virtual filesystem from KB search results and lets the vision LLM
|
||||
explore documents via ``ls``, ``read_file``, ``glob``, ``grep``, etc.
|
||||
before generating the final completion.
|
||||
|
||||
Performance: KB search and agent graph compilation run in parallel so
|
||||
the only sequential latency is KB-search (or agent compile, whichever is
|
||||
slower) + the agent's LLM turns. There is no separate "query extraction"
|
||||
LLM call — the window title is used directly as the KB search query.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
|
||||
from deepagents.graph import BASE_AGENT_PROMPT
|
||||
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
|
||||
from langchain.agents import create_agent
|
||||
from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware
|
||||
from langchain_core.language_models import BaseChatModel
|
||||
from langchain_core.messages import AIMessage, ToolMessage
|
||||
|
||||
from app.agents.new_chat.document_xml import build_document_xml
|
||||
from app.agents.new_chat.middleware.filesystem import SurfSenseFilesystemMiddleware
|
||||
from app.agents.new_chat.middleware.knowledge_search import (
|
||||
search_knowledge_base,
|
||||
)
|
||||
from app.agents.new_chat.path_resolver import (
|
||||
DOCUMENTS_ROOT,
|
||||
build_path_index,
|
||||
doc_to_virtual_path,
|
||||
)
|
||||
from app.db import shielded_async_session
|
||||
from app.services.new_streaming_service import VercelStreamingService
|
||||
|
||||
try:
|
||||
from deepagents.backends.utils import create_file_data
|
||||
except Exception: # pragma: no cover - defensive
|
||||
|
||||
def create_file_data(content: str) -> dict[str, Any]:
|
||||
return {"content": content.split("\n")}
|
||||
|
||||
|
||||
async def _build_autocomplete_filesystem(
|
||||
*,
|
||||
documents: Any,
|
||||
search_space_id: int,
|
||||
) -> tuple[dict[str, Any], dict[int, str]]:
|
||||
"""Build a ``state['files']``-shaped dict from KB search results.
|
||||
|
||||
This is the autocomplete-specific replacement for the previous
|
||||
``build_scoped_filesystem`` helper. It uses the canonical path resolver
|
||||
so paths line up with the rest of the system, including collision
|
||||
suffixes for duplicate titles.
|
||||
"""
|
||||
files: dict[str, Any] = {}
|
||||
doc_id_to_path: dict[int, str] = {}
|
||||
|
||||
if not documents:
|
||||
return files, doc_id_to_path
|
||||
|
||||
async with shielded_async_session() as session:
|
||||
index = await build_path_index(session, search_space_id)
|
||||
|
||||
for document in documents:
|
||||
if not isinstance(document, dict):
|
||||
continue
|
||||
meta = document.get("document") or {}
|
||||
doc_id = meta.get("id")
|
||||
if not isinstance(doc_id, int):
|
||||
continue
|
||||
title = str(meta.get("title") or "untitled")
|
||||
folder_id = meta.get("folder_id")
|
||||
path = doc_to_virtual_path(
|
||||
doc_id=doc_id, title=title, folder_id=folder_id, index=index
|
||||
)
|
||||
chunk_ids = document.get("matched_chunk_ids") or []
|
||||
try:
|
||||
matched_set = {int(c) for c in chunk_ids}
|
||||
except (TypeError, ValueError):
|
||||
matched_set = set()
|
||||
xml = build_document_xml(document, matched_chunk_ids=matched_set)
|
||||
files[path] = create_file_data(xml)
|
||||
doc_id_to_path[doc_id] = path
|
||||
|
||||
if not files:
|
||||
# Ensure the synthetic /documents folder is visible even when empty.
|
||||
files.setdefault(f"{DOCUMENTS_ROOT}/.placeholder", create_file_data(""))
|
||||
|
||||
return files, doc_id_to_path
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
KB_TOP_K = 10
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# System prompt
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
AUTOCOMPLETE_SYSTEM_PROMPT = """You are a smart writing assistant that analyzes the user's screen to draft or complete text.
|
||||
|
||||
You will receive a screenshot of the user's screen. Your PRIMARY source of truth is the screenshot itself — the visual context determines what to write.
|
||||
|
||||
Your job:
|
||||
1. Analyze the ENTIRE screenshot to understand what the user is working on (email thread, chat conversation, document, code editor, form, etc.).
|
||||
2. Identify the text area where the user will type.
|
||||
3. Generate the text the user most likely wants to write based on the visual context.
|
||||
|
||||
You also have access to the user's knowledge base documents via filesystem tools. However:
|
||||
- ONLY consult the knowledge base if the screenshot clearly involves a topic where your KB documents are DIRECTLY relevant (e.g., the user is writing about a specific project/topic that matches a document title).
|
||||
- Do NOT explore documents just because they exist. Most autocomplete requests can be answered purely from the screenshot.
|
||||
- If you do read a document, only incorporate information that is 100% relevant to what the user is typing RIGHT NOW. Do not add extra details, background, or tangential information from the KB.
|
||||
- Keep your output SHORT — autocomplete should feel like a natural continuation, not an essay.
|
||||
|
||||
Key behavior:
|
||||
- If the text area is EMPTY, draft a concise response or message based on what you see on screen (e.g., reply to an email, respond to a chat message, continue a document).
|
||||
- If the text area already has text, continue it naturally — typically just a sentence or two.
|
||||
|
||||
Rules:
|
||||
- Be CONCISE. Prefer a single paragraph or a few sentences. Autocomplete is a quick assist, not a full draft.
|
||||
- Match the tone and formality of the surrounding context.
|
||||
- If the screen shows code, write code. If it shows a casual chat, be casual. If it shows a formal email, be formal.
|
||||
- Do NOT describe the screenshot or explain your reasoning.
|
||||
- Do NOT cite or reference documents explicitly — just let the knowledge inform your writing naturally.
|
||||
- If you cannot determine what to write, output an empty JSON array: []
|
||||
|
||||
## Output Format
|
||||
|
||||
You MUST provide exactly 3 different suggestion options. Each should be a distinct, plausible completion — vary the tone, detail level, or angle.
|
||||
|
||||
Return your suggestions as a JSON array of exactly 3 strings. Output ONLY the JSON array, nothing else — no markdown fences, no explanation, no commentary.
|
||||
|
||||
Example format:
|
||||
["First suggestion text here.", "Second suggestion — a different take.", "Third option with another approach."]
|
||||
|
||||
## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file`, `glob`, `grep`
|
||||
|
||||
All file paths must start with a `/`.
|
||||
- ls: list files and directories at a given path.
|
||||
- read_file: read a file from the filesystem.
|
||||
- write_file: create a temporary file in the session (not persisted).
|
||||
- edit_file: edit a file in the session (not persisted for /documents/ files).
|
||||
- glob: find files matching a pattern (e.g., "**/*.xml").
|
||||
- grep: search for text within files.
|
||||
|
||||
## When to Use Filesystem Tools
|
||||
|
||||
BEFORE reaching for any tool, ask yourself: "Can I write a good completion purely from the screenshot?" If yes, just write it — do NOT explore the KB.
|
||||
|
||||
Only use tools when:
|
||||
- The user is clearly writing about a specific topic that likely has detailed information in their KB.
|
||||
- You need a specific fact, name, number, or reference that the screenshot doesn't provide.
|
||||
|
||||
When you do use tools, be surgical:
|
||||
- Check the `ls` output first. If no document title looks relevant, stop — do not read files just to see what's there.
|
||||
- If a title looks relevant, read only the `<chunk_index>` (first ~20 lines) and jump to matched chunks. Do not read entire documents.
|
||||
- Extract only the specific information you need and move on to generating the completion.
|
||||
|
||||
## Reading Documents Efficiently
|
||||
|
||||
Documents are formatted as XML. Each document contains:
|
||||
- `<document_metadata>` — title, type, URL, etc.
|
||||
- `<chunk_index>` — a table of every chunk with its **line range** and a
|
||||
`matched="true"` flag for chunks that matched the search query.
|
||||
- `<document_content>` — the actual chunks in original document order.
|
||||
|
||||
**Workflow**: read the first ~20 lines to see the `<chunk_index>`, identify
|
||||
chunks marked `matched="true"`, then use `read_file(path, offset=<start_line>,
|
||||
limit=<lines>)` to jump directly to those sections."""
|
||||
|
||||
APP_CONTEXT_BLOCK = """
|
||||
|
||||
The user is currently working in "{app_name}" (window: "{window_title}"). Use this to understand the type of application and adapt your tone and format accordingly."""
|
||||
|
||||
|
||||
def _build_autocomplete_system_prompt(app_name: str, window_title: str) -> str:
|
||||
prompt = AUTOCOMPLETE_SYSTEM_PROMPT
|
||||
if app_name:
|
||||
prompt += APP_CONTEXT_BLOCK.format(app_name=app_name, window_title=window_title)
|
||||
return prompt
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pre-compute KB filesystem (runs in parallel with agent compilation)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _KBResult:
|
||||
"""Container for pre-computed KB filesystem results."""
|
||||
|
||||
__slots__ = ("files", "ls_ai_msg", "ls_tool_msg")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
files: dict[str, Any] | None = None,
|
||||
ls_ai_msg: AIMessage | None = None,
|
||||
ls_tool_msg: ToolMessage | None = None,
|
||||
) -> None:
|
||||
self.files = files
|
||||
self.ls_ai_msg = ls_ai_msg
|
||||
self.ls_tool_msg = ls_tool_msg
|
||||
|
||||
@property
|
||||
def has_documents(self) -> bool:
|
||||
return bool(self.files)
|
||||
|
||||
|
||||
async def precompute_kb_filesystem(
|
||||
search_space_id: int,
|
||||
query: str,
|
||||
top_k: int = KB_TOP_K,
|
||||
) -> _KBResult:
|
||||
"""Search the KB and build the scoped filesystem outside the agent.
|
||||
|
||||
This is designed to be called via ``asyncio.gather`` alongside agent
|
||||
graph compilation so the two run concurrently.
|
||||
"""
|
||||
if not query:
|
||||
return _KBResult()
|
||||
|
||||
try:
|
||||
search_results = await search_knowledge_base(
|
||||
query=query,
|
||||
search_space_id=search_space_id,
|
||||
top_k=top_k,
|
||||
)
|
||||
|
||||
if not search_results:
|
||||
return _KBResult()
|
||||
|
||||
new_files, _ = await _build_autocomplete_filesystem(
|
||||
documents=search_results,
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
|
||||
if not new_files:
|
||||
return _KBResult()
|
||||
|
||||
doc_paths = [
|
||||
p
|
||||
for p, v in new_files.items()
|
||||
if p.startswith("/documents/") and v is not None
|
||||
]
|
||||
tool_call_id = f"auto_ls_{uuid.uuid4().hex[:12]}"
|
||||
ai_msg = AIMessage(
|
||||
content="",
|
||||
tool_calls=[
|
||||
{"name": "ls", "args": {"path": "/documents"}, "id": tool_call_id}
|
||||
],
|
||||
)
|
||||
tool_msg = ToolMessage(
|
||||
content=str(doc_paths) if doc_paths else "No documents found.",
|
||||
tool_call_id=tool_call_id,
|
||||
)
|
||||
return _KBResult(files=new_files, ls_ai_msg=ai_msg, ls_tool_msg=tool_msg)
|
||||
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"KB pre-computation failed, proceeding without KB", exc_info=True
|
||||
)
|
||||
return _KBResult()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Filesystem middleware — no save_document, no persistence
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class AutocompleteFilesystemMiddleware(SurfSenseFilesystemMiddleware):
|
||||
"""Filesystem middleware for autocomplete — read-only exploration only.
|
||||
|
||||
Passes ``search_space_id=None`` so the new persistence pipeline is
|
||||
bypassed; the autocomplete flow only reads, never commits to Postgres.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(search_space_id=None, created_by_id=None)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent factory
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _compile_agent(
|
||||
llm: BaseChatModel,
|
||||
app_name: str,
|
||||
window_title: str,
|
||||
) -> Any:
|
||||
"""Compile the agent graph (CPU-bound, runs in a thread)."""
|
||||
system_prompt = _build_autocomplete_system_prompt(app_name, window_title)
|
||||
final_system_prompt = system_prompt + "\n\n" + BASE_AGENT_PROMPT
|
||||
|
||||
middleware = [
|
||||
AutocompleteFilesystemMiddleware(),
|
||||
PatchToolCallsMiddleware(),
|
||||
AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"),
|
||||
]
|
||||
|
||||
agent = await asyncio.to_thread(
|
||||
create_agent,
|
||||
llm,
|
||||
system_prompt=final_system_prompt,
|
||||
tools=[],
|
||||
middleware=middleware,
|
||||
)
|
||||
return agent.with_config({"recursion_limit": 200})
|
||||
|
||||
|
||||
async def create_autocomplete_agent(
|
||||
llm: BaseChatModel,
|
||||
*,
|
||||
search_space_id: int,
|
||||
kb_query: str,
|
||||
app_name: str = "",
|
||||
window_title: str = "",
|
||||
) -> tuple[Any, _KBResult]:
|
||||
"""Create the autocomplete agent and pre-compute KB in parallel.
|
||||
|
||||
Returns ``(agent, kb_result)`` so the caller can inject the pre-computed
|
||||
filesystem into the agent's initial state without any middleware delay.
|
||||
"""
|
||||
agent, kb = await asyncio.gather(
|
||||
_compile_agent(llm, app_name, window_title),
|
||||
precompute_kb_filesystem(search_space_id, kb_query),
|
||||
)
|
||||
return agent, kb
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# JSON suggestion parsing (with fallback)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _parse_suggestions(raw: str) -> list[str]:
|
||||
"""Extract a list of suggestion strings from the agent's output.
|
||||
|
||||
Tries, in order:
|
||||
1. Direct ``json.loads``
|
||||
2. Extract content between ```json ... ``` fences
|
||||
3. Find the first ``[`` … ``]`` span
|
||||
Falls back to wrapping the raw text as a single suggestion.
|
||||
"""
|
||||
text = raw.strip()
|
||||
if not text:
|
||||
return []
|
||||
|
||||
for candidate in _json_candidates(text):
|
||||
try:
|
||||
parsed = json.loads(candidate)
|
||||
if isinstance(parsed, list) and all(isinstance(s, str) for s in parsed):
|
||||
return [s for s in parsed if s.strip()]
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
return [text]
|
||||
|
||||
|
||||
def _json_candidates(text: str) -> list[str]:
|
||||
"""Yield candidate JSON strings from raw text."""
|
||||
candidates = [text]
|
||||
|
||||
fence = re.search(r"```(?:json)?\s*\n?(.*?)```", text, re.DOTALL)
|
||||
if fence:
|
||||
candidates.append(fence.group(1).strip())
|
||||
|
||||
bracket = re.search(r"\[.*]", text, re.DOTALL)
|
||||
if bracket:
|
||||
candidates.append(bracket.group(0))
|
||||
|
||||
return candidates
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Streaming helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def stream_autocomplete_agent(
|
||||
agent: Any,
|
||||
input_data: dict[str, Any],
|
||||
streaming_service: VercelStreamingService,
|
||||
*,
|
||||
emit_message_start: bool = True,
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Stream agent events as Vercel SSE, with thinking steps for tool calls.
|
||||
|
||||
When ``emit_message_start`` is False the caller has already sent the
|
||||
``message_start`` event (e.g. to show preparation steps before the agent
|
||||
runs).
|
||||
"""
|
||||
thread_id = uuid.uuid4().hex
|
||||
config = {"configurable": {"thread_id": thread_id}}
|
||||
|
||||
text_buffer: list[str] = []
|
||||
active_tool_depth = 0
|
||||
thinking_step_counter = 0
|
||||
tool_step_ids: dict[str, str] = {}
|
||||
step_titles: dict[str, str] = {}
|
||||
completed_step_ids: set[str] = set()
|
||||
last_active_step_id: str | None = None
|
||||
|
||||
def next_thinking_step_id() -> str:
|
||||
nonlocal thinking_step_counter
|
||||
thinking_step_counter += 1
|
||||
return f"autocomplete-step-{thinking_step_counter}"
|
||||
|
||||
def complete_current_step() -> str | None:
|
||||
nonlocal last_active_step_id
|
||||
if last_active_step_id and last_active_step_id not in completed_step_ids:
|
||||
completed_step_ids.add(last_active_step_id)
|
||||
title = step_titles.get(last_active_step_id, "Done")
|
||||
event = streaming_service.format_thinking_step(
|
||||
step_id=last_active_step_id,
|
||||
title=title,
|
||||
status="complete",
|
||||
)
|
||||
last_active_step_id = None
|
||||
return event
|
||||
return None
|
||||
|
||||
if emit_message_start:
|
||||
yield streaming_service.format_message_start()
|
||||
|
||||
gen_step_id = next_thinking_step_id()
|
||||
last_active_step_id = gen_step_id
|
||||
step_titles[gen_step_id] = "Generating suggestions"
|
||||
yield streaming_service.format_thinking_step(
|
||||
step_id=gen_step_id,
|
||||
title="Generating suggestions",
|
||||
status="in_progress",
|
||||
)
|
||||
|
||||
try:
|
||||
async for event in agent.astream_events(
|
||||
input_data, config=config, version="v2"
|
||||
):
|
||||
event_type = event.get("event", "")
|
||||
if event_type == "on_chat_model_stream":
|
||||
if active_tool_depth > 0:
|
||||
continue
|
||||
if "surfsense:internal" in event.get("tags", []):
|
||||
continue
|
||||
chunk = event.get("data", {}).get("chunk")
|
||||
if chunk and hasattr(chunk, "content"):
|
||||
content = chunk.content
|
||||
if content and isinstance(content, str):
|
||||
text_buffer.append(content)
|
||||
|
||||
elif event_type == "on_chat_model_end":
|
||||
if active_tool_depth > 0:
|
||||
continue
|
||||
if "surfsense:internal" in event.get("tags", []):
|
||||
continue
|
||||
output = event.get("data", {}).get("output")
|
||||
if output and hasattr(output, "content"):
|
||||
if getattr(output, "tool_calls", None):
|
||||
continue
|
||||
content = output.content
|
||||
if content and isinstance(content, str) and not text_buffer:
|
||||
text_buffer.append(content)
|
||||
|
||||
elif event_type == "on_tool_start":
|
||||
active_tool_depth += 1
|
||||
tool_name = event.get("name", "unknown_tool")
|
||||
run_id = event.get("run_id", "")
|
||||
tool_input = event.get("data", {}).get("input", {})
|
||||
|
||||
step_event = complete_current_step()
|
||||
if step_event:
|
||||
yield step_event
|
||||
|
||||
tool_step_id = next_thinking_step_id()
|
||||
tool_step_ids[run_id] = tool_step_id
|
||||
last_active_step_id = tool_step_id
|
||||
|
||||
title, items = _describe_tool_call(tool_name, tool_input)
|
||||
step_titles[tool_step_id] = title
|
||||
yield streaming_service.format_thinking_step(
|
||||
step_id=tool_step_id,
|
||||
title=title,
|
||||
status="in_progress",
|
||||
items=items,
|
||||
)
|
||||
|
||||
elif event_type == "on_tool_end":
|
||||
active_tool_depth = max(0, active_tool_depth - 1)
|
||||
run_id = event.get("run_id", "")
|
||||
step_id = tool_step_ids.pop(run_id, None)
|
||||
if step_id and step_id not in completed_step_ids:
|
||||
completed_step_ids.add(step_id)
|
||||
title = step_titles.get(step_id, "Done")
|
||||
yield streaming_service.format_thinking_step(
|
||||
step_id=step_id,
|
||||
title=title,
|
||||
status="complete",
|
||||
)
|
||||
if last_active_step_id == step_id:
|
||||
last_active_step_id = None
|
||||
|
||||
step_event = complete_current_step()
|
||||
if step_event:
|
||||
yield step_event
|
||||
|
||||
raw_text = "".join(text_buffer)
|
||||
suggestions = _parse_suggestions(raw_text)
|
||||
|
||||
yield streaming_service.format_data("suggestions", {"options": suggestions})
|
||||
|
||||
yield streaming_service.format_finish()
|
||||
yield streaming_service.format_done()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Autocomplete agent streaming error: {e}", exc_info=True)
|
||||
yield streaming_service.format_error("Autocomplete failed. Please try again.")
|
||||
yield streaming_service.format_done()
|
||||
|
||||
|
||||
def _describe_tool_call(tool_name: str, tool_input: Any) -> tuple[str, list[str]]:
|
||||
"""Return a human-readable (title, items) for a tool call thinking step."""
|
||||
inp = tool_input if isinstance(tool_input, dict) else {}
|
||||
if tool_name == "ls":
|
||||
path = inp.get("path", "/")
|
||||
return "Listing files", [path]
|
||||
if tool_name == "read_file":
|
||||
fp = inp.get("file_path", "")
|
||||
display = fp if len(fp) <= 80 else "…" + fp[-77:]
|
||||
return "Reading file", [display]
|
||||
if tool_name == "write_file":
|
||||
fp = inp.get("file_path", "")
|
||||
display = fp if len(fp) <= 80 else "…" + fp[-77:]
|
||||
return "Writing file", [display]
|
||||
if tool_name == "edit_file":
|
||||
fp = inp.get("file_path", "")
|
||||
display = fp if len(fp) <= 80 else "…" + fp[-77:]
|
||||
return "Editing file", [display]
|
||||
if tool_name == "glob":
|
||||
pat = inp.get("pattern", "")
|
||||
base = inp.get("path", "/")
|
||||
return "Searching files", [f"{pat} in {base}"]
|
||||
if tool_name == "grep":
|
||||
pat = inp.get("pattern", "")
|
||||
path = inp.get("path", "")
|
||||
display_pat = pat[:60] + ("…" if len(pat) > 60 else "")
|
||||
return "Searching content", [
|
||||
f'"{display_pat}"' + (f" in {path}" if path else "")
|
||||
]
|
||||
return f"Using {tool_name}", []
|
||||
|
|
@ -1,50 +0,0 @@
|
|||
"""Provider-specific style hints from ``prompts/providers/`` (main agent only)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from .load_md import read_prompt_md
|
||||
|
||||
ProviderVariant = str
|
||||
|
||||
_OPENAI_CODEX_RE = re.compile(
|
||||
r"\b(gpt-codex|codex-mini|gpt-[\d.]+-codex)\b", re.IGNORECASE
|
||||
)
|
||||
_OPENAI_REASONING_RE = re.compile(r"\b(gpt-5|o\d|o-)", re.IGNORECASE)
|
||||
_OPENAI_CLASSIC_RE = re.compile(r"\bgpt-4", re.IGNORECASE)
|
||||
_ANTHROPIC_RE = re.compile(r"\bclaude\b", re.IGNORECASE)
|
||||
_GOOGLE_RE = re.compile(r"\bgemini\b", re.IGNORECASE)
|
||||
_KIMI_RE = re.compile(r"\b(kimi[-\d.]*|moonshot)\b", re.IGNORECASE)
|
||||
_GROK_RE = re.compile(r"\bgrok\b", re.IGNORECASE)
|
||||
_DEEPSEEK_RE = re.compile(r"\bdeepseek\b", re.IGNORECASE)
|
||||
|
||||
|
||||
def detect_provider_variant(model_name: str | None) -> ProviderVariant:
|
||||
if not model_name:
|
||||
return "default"
|
||||
name = model_name.strip()
|
||||
if _OPENAI_CODEX_RE.search(name):
|
||||
return "openai_codex"
|
||||
if _OPENAI_REASONING_RE.search(name):
|
||||
return "openai_reasoning"
|
||||
if _OPENAI_CLASSIC_RE.search(name):
|
||||
return "openai_classic"
|
||||
if _ANTHROPIC_RE.search(name):
|
||||
return "anthropic"
|
||||
if _GOOGLE_RE.search(name):
|
||||
return "google"
|
||||
if _KIMI_RE.search(name):
|
||||
return "kimi"
|
||||
if _GROK_RE.search(name):
|
||||
return "grok"
|
||||
if _DEEPSEEK_RE.search(name):
|
||||
return "deepseek"
|
||||
return "default"
|
||||
|
||||
|
||||
def build_provider_hint_block(provider_variant: ProviderVariant) -> str:
|
||||
if not provider_variant or provider_variant == "default":
|
||||
return ""
|
||||
text = read_prompt_md(f"providers/{provider_variant}.md")
|
||||
return f"\n{text}\n" if text else ""
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
"""Provider-specific style hints."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ..provider_hints import build_provider_hint_block, detect_provider_variant
|
||||
|
||||
|
||||
def build_provider_section(*, model_name: str | None) -> str:
|
||||
return build_provider_hint_block(detect_provider_variant(model_name))
|
||||
|
|
@ -1,11 +0,0 @@
|
|||
"""Linear tools for creating, updating, and deleting issues."""
|
||||
|
||||
from .create_issue import create_create_linear_issue_tool
|
||||
from .delete_issue import create_delete_linear_issue_tool
|
||||
from .update_issue import create_update_linear_issue_tool
|
||||
|
||||
__all__ = [
|
||||
"create_create_linear_issue_tool",
|
||||
"create_delete_linear_issue_tool",
|
||||
"create_update_linear_issue_tool",
|
||||
]
|
||||
|
|
@ -1,266 +0,0 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.agents.new_chat.tools.hitl import request_approval
|
||||
from app.connectors.linear_connector import LinearAPIError, LinearConnector
|
||||
from app.db import async_session_maker
|
||||
from app.services.linear import LinearToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_create_linear_issue_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
connector_id: int | None = None,
|
||||
):
|
||||
"""Factory function to create the create_linear_issue tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker`. This is critical for the compiled-agent
|
||||
cache: the compiled graph (and therefore this closure) is reused
|
||||
across HTTP requests, so capturing a per-request session here would
|
||||
surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
search_space_id: Search space ID to find the Linear connector
|
||||
user_id: User ID for fetching user-specific context
|
||||
connector_id: Optional specific connector ID (if known)
|
||||
|
||||
Returns:
|
||||
Configured create_linear_issue tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def create_linear_issue(
|
||||
title: str,
|
||||
description: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Create a new issue in Linear.
|
||||
|
||||
Use this tool when the user explicitly asks to create, add, or file
|
||||
a new issue / ticket / task in Linear. The user MUST describe the issue
|
||||
before you call this tool. If the request is vague, ask what the issue
|
||||
should be about. Never call this tool without a clear topic from the user.
|
||||
|
||||
Args:
|
||||
title: Short, descriptive issue title. Infer from the user's request.
|
||||
description: Optional markdown body for the issue. Generate from context.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", or "error"
|
||||
- issue_id: Linear issue UUID (if success)
|
||||
- identifier: Human-readable ID like "ENG-42" (if success)
|
||||
- url: URL to the created issue (if success)
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT: If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment (e.g., "Understood, I won't create the issue.")
|
||||
and move on. Do NOT retry, troubleshoot, or suggest alternatives.
|
||||
|
||||
Examples:
|
||||
- "Create a Linear issue for the login bug"
|
||||
- "File a ticket about the payment timeout problem"
|
||||
- "Add an issue for the broken search feature"
|
||||
"""
|
||||
logger.info(f"create_linear_issue called: title='{title}'")
|
||||
|
||||
if search_space_id is None or user_id is None:
|
||||
logger.error(
|
||||
"Linear tool not properly configured - missing required parameters"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Linear tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
metadata_service = LinearToolMetadataService(db_session)
|
||||
context = await metadata_service.get_creation_context(
|
||||
search_space_id, user_id
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
logger.error(
|
||||
f"Failed to fetch creation context: {context['error']}"
|
||||
)
|
||||
return {"status": "error", "message": context["error"]}
|
||||
|
||||
workspaces = context.get("workspaces", [])
|
||||
if workspaces and all(w.get("auth_expired") for w in workspaces):
|
||||
logger.warning("All Linear accounts have expired authentication")
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": "All connected Linear accounts need re-authentication. Please re-authenticate in your connector settings.",
|
||||
"connector_type": "linear",
|
||||
}
|
||||
|
||||
logger.info(f"Requesting approval for creating Linear issue: '{title}'")
|
||||
result = request_approval(
|
||||
action_type="linear_issue_creation",
|
||||
tool_name="create_linear_issue",
|
||||
params={
|
||||
"title": title,
|
||||
"description": description,
|
||||
"team_id": None,
|
||||
"state_id": None,
|
||||
"assignee_id": None,
|
||||
"priority": None,
|
||||
"label_ids": [],
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
context=context,
|
||||
)
|
||||
|
||||
if result.rejected:
|
||||
logger.info("Linear issue creation rejected by user")
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. Do not retry or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_title = result.params.get("title", title)
|
||||
final_description = result.params.get("description", description)
|
||||
final_team_id = result.params.get("team_id")
|
||||
final_state_id = result.params.get("state_id")
|
||||
final_assignee_id = result.params.get("assignee_id")
|
||||
final_priority = result.params.get("priority")
|
||||
final_label_ids = result.params.get("label_ids") or []
|
||||
final_connector_id = result.params.get("connector_id", connector_id)
|
||||
|
||||
if not final_title or not final_title.strip():
|
||||
logger.error("Title is empty or contains only whitespace")
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Issue title cannot be empty.",
|
||||
}
|
||||
if not final_team_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "A team must be selected to create an issue.",
|
||||
}
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
actual_connector_id = final_connector_id
|
||||
if actual_connector_id is None:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No Linear connector found. Please connect Linear in your workspace settings.",
|
||||
}
|
||||
actual_connector_id = connector.id
|
||||
logger.info(f"Found Linear connector: id={actual_connector_id}")
|
||||
else:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == actual_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Linear connector is invalid or has been disconnected.",
|
||||
}
|
||||
logger.info(f"Validated Linear connector: id={actual_connector_id}")
|
||||
|
||||
logger.info(
|
||||
f"Creating Linear issue with final params: title='{final_title}'"
|
||||
)
|
||||
linear_client = LinearConnector(
|
||||
session=db_session, connector_id=actual_connector_id
|
||||
)
|
||||
result = await linear_client.create_issue(
|
||||
team_id=final_team_id,
|
||||
title=final_title,
|
||||
description=final_description,
|
||||
state_id=final_state_id,
|
||||
assignee_id=final_assignee_id,
|
||||
priority=final_priority,
|
||||
label_ids=final_label_ids if final_label_ids else None,
|
||||
)
|
||||
|
||||
if result.get("status") == "error":
|
||||
logger.error(
|
||||
f"Failed to create Linear issue: {result.get('message')}"
|
||||
)
|
||||
return {"status": "error", "message": result.get("message")}
|
||||
|
||||
logger.info(
|
||||
f"Linear issue created: {result.get('identifier')} - {result.get('title')}"
|
||||
)
|
||||
|
||||
kb_message_suffix = ""
|
||||
try:
|
||||
from app.services.linear import LinearKBSyncService
|
||||
|
||||
kb_service = LinearKBSyncService(db_session)
|
||||
kb_result = await kb_service.sync_after_create(
|
||||
issue_id=result.get("id"),
|
||||
issue_identifier=result.get("identifier", ""),
|
||||
issue_title=result.get("title", final_title),
|
||||
issue_url=result.get("url"),
|
||||
description=final_description,
|
||||
connector_id=actual_connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
if kb_result["status"] == "success":
|
||||
kb_message_suffix = (
|
||||
" Your knowledge base has also been updated."
|
||||
)
|
||||
else:
|
||||
kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync."
|
||||
except Exception as kb_err:
|
||||
logger.warning(f"KB sync after create failed: {kb_err}")
|
||||
kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync."
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"issue_id": result.get("id"),
|
||||
"identifier": result.get("identifier"),
|
||||
"url": result.get("url"),
|
||||
"message": (result.get("message", "") + kb_message_suffix),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error creating Linear issue: {e}", exc_info=True)
|
||||
if isinstance(e, ValueError | LinearAPIError):
|
||||
message = str(e)
|
||||
else:
|
||||
message = (
|
||||
"Something went wrong while creating the issue. Please try again."
|
||||
)
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return create_linear_issue
|
||||
|
|
@ -1,256 +0,0 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.agents.new_chat.tools.hitl import request_approval
|
||||
from app.connectors.linear_connector import LinearAPIError, LinearConnector
|
||||
from app.db import async_session_maker
|
||||
from app.services.linear import LinearToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_delete_linear_issue_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
connector_id: int | None = None,
|
||||
):
|
||||
"""Factory function to create the delete_linear_issue tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker`. This is critical for the compiled-agent
|
||||
cache: the compiled graph (and therefore this closure) is reused
|
||||
across HTTP requests, so capturing a per-request session here would
|
||||
surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
search_space_id: Search space ID to find the Linear connector
|
||||
user_id: User ID for finding the correct Linear connector
|
||||
connector_id: Optional specific connector ID (if known)
|
||||
|
||||
Returns:
|
||||
Configured delete_linear_issue tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def delete_linear_issue(
|
||||
issue_ref: str,
|
||||
delete_from_kb: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Archive (delete) a Linear issue.
|
||||
|
||||
Use this tool when the user asks to delete, remove, or archive a Linear issue.
|
||||
Note that Linear archives issues rather than permanently deleting them
|
||||
(they can be restored from the archive).
|
||||
|
||||
|
||||
Args:
|
||||
issue_ref: The issue to delete. Can be the issue title (e.g. "Fix login bug"),
|
||||
the identifier (e.g. "ENG-42"), or the full document title
|
||||
(e.g. "ENG-42: Fix login bug").
|
||||
delete_from_kb: Whether to also remove the issue from the knowledge base.
|
||||
Default is False. Set to True to remove from both Linear
|
||||
and the knowledge base.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", "not_found", or "error"
|
||||
- identifier: Human-readable ID like "ENG-42" (if success)
|
||||
- message: Success or error message
|
||||
- deleted_from_kb: Whether the issue was also removed from the knowledge base (if success)
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment (e.g., "Understood, I won't delete the issue.")
|
||||
and move on. Do NOT ask for alternatives or troubleshoot.
|
||||
- If status is "not_found", inform the user conversationally using the exact message
|
||||
provided. Do NOT treat this as an error. Simply relay the message and ask the user
|
||||
to verify the issue title or identifier, or check if it has been indexed.
|
||||
Examples:
|
||||
- "Delete the 'Fix login bug' Linear issue"
|
||||
- "Archive ENG-42"
|
||||
- "Remove the 'Old payment flow' issue from Linear"
|
||||
"""
|
||||
logger.info(
|
||||
f"delete_linear_issue called: issue_ref='{issue_ref}', delete_from_kb={delete_from_kb}"
|
||||
)
|
||||
|
||||
if search_space_id is None or user_id is None:
|
||||
logger.error(
|
||||
"Linear tool not properly configured - missing required parameters"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Linear tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
metadata_service = LinearToolMetadataService(db_session)
|
||||
context = await metadata_service.get_delete_context(
|
||||
search_space_id, user_id, issue_ref
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
error_msg = context["error"]
|
||||
if context.get("auth_expired"):
|
||||
logger.warning(f"Auth expired for delete context: {error_msg}")
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": error_msg,
|
||||
"connector_id": context.get("connector_id"),
|
||||
"connector_type": "linear",
|
||||
}
|
||||
if "not found" in error_msg.lower():
|
||||
logger.warning(f"Issue not found: {error_msg}")
|
||||
return {"status": "not_found", "message": error_msg}
|
||||
else:
|
||||
logger.error(f"Failed to fetch delete context: {error_msg}")
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
issue_id = context["issue"]["id"]
|
||||
issue_identifier = context["issue"].get("identifier", "")
|
||||
document_id = context["issue"]["document_id"]
|
||||
connector_id_from_context = context.get("workspace", {}).get("id")
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for deleting Linear issue: '{issue_ref}' "
|
||||
f"(id={issue_id}, delete_from_kb={delete_from_kb})"
|
||||
)
|
||||
result = request_approval(
|
||||
action_type="linear_issue_deletion",
|
||||
tool_name="delete_linear_issue",
|
||||
params={
|
||||
"issue_id": issue_id,
|
||||
"connector_id": connector_id_from_context,
|
||||
"delete_from_kb": delete_from_kb,
|
||||
},
|
||||
context=context,
|
||||
)
|
||||
|
||||
if result.rejected:
|
||||
logger.info("Linear issue deletion rejected by user")
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. Do not retry or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_issue_id = result.params.get("issue_id", issue_id)
|
||||
final_connector_id = result.params.get(
|
||||
"connector_id", connector_id_from_context
|
||||
)
|
||||
final_delete_from_kb = result.params.get(
|
||||
"delete_from_kb", delete_from_kb
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Deleting Linear issue with final params: issue_id={final_issue_id}, "
|
||||
f"connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}"
|
||||
)
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
if final_connector_id:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
logger.error(
|
||||
f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Linear connector is invalid or has been disconnected.",
|
||||
}
|
||||
actual_connector_id = connector.id
|
||||
logger.info(f"Validated Linear connector: id={actual_connector_id}")
|
||||
else:
|
||||
logger.error("No connector found for this issue")
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No connector found for this issue.",
|
||||
}
|
||||
|
||||
linear_client = LinearConnector(
|
||||
session=db_session, connector_id=actual_connector_id
|
||||
)
|
||||
|
||||
result = await linear_client.archive_issue(issue_id=final_issue_id)
|
||||
|
||||
logger.info(
|
||||
f"archive_issue result: {result.get('status')} - {result.get('message', '')}"
|
||||
)
|
||||
|
||||
deleted_from_kb = False
|
||||
if (
|
||||
result.get("status") == "success"
|
||||
and final_delete_from_kb
|
||||
and document_id
|
||||
):
|
||||
try:
|
||||
from app.db import Document
|
||||
|
||||
doc_result = await db_session.execute(
|
||||
select(Document).filter(Document.id == document_id)
|
||||
)
|
||||
document = doc_result.scalars().first()
|
||||
if document:
|
||||
await db_session.delete(document)
|
||||
await db_session.commit()
|
||||
deleted_from_kb = True
|
||||
logger.info(
|
||||
f"Deleted document {document_id} from knowledge base"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Document {document_id} not found in KB")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete document from KB: {e}")
|
||||
await db_session.rollback()
|
||||
result["warning"] = (
|
||||
f"Issue archived in Linear, but failed to remove from knowledge base: {e!s}"
|
||||
)
|
||||
|
||||
if result.get("status") == "success":
|
||||
result["deleted_from_kb"] = deleted_from_kb
|
||||
if issue_identifier:
|
||||
result["message"] = (
|
||||
f"Issue {issue_identifier} archived successfully."
|
||||
)
|
||||
if deleted_from_kb:
|
||||
result["message"] = (
|
||||
f"{result.get('message', '')} Also removed from the knowledge base."
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error deleting Linear issue: {e}", exc_info=True)
|
||||
if isinstance(e, ValueError | LinearAPIError):
|
||||
message = str(e)
|
||||
else:
|
||||
message = (
|
||||
"Something went wrong while deleting the issue. Please try again."
|
||||
)
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return delete_linear_issue
|
||||
|
|
@ -1,327 +0,0 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.agents.new_chat.tools.hitl import request_approval
|
||||
from app.connectors.linear_connector import LinearAPIError, LinearConnector
|
||||
from app.db import async_session_maker
|
||||
from app.services.linear import LinearKBSyncService, LinearToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_update_linear_issue_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
connector_id: int | None = None,
|
||||
):
|
||||
"""Factory function to create the update_linear_issue tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker`. This is critical for the compiled-agent
|
||||
cache: the compiled graph (and therefore this closure) is reused
|
||||
across HTTP requests, so capturing a per-request session here would
|
||||
surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
search_space_id: Search space ID to find the Linear connector
|
||||
user_id: User ID for fetching user-specific context
|
||||
connector_id: Optional specific connector ID (if known)
|
||||
|
||||
Returns:
|
||||
Configured update_linear_issue tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def update_linear_issue(
|
||||
issue_ref: str,
|
||||
new_title: str | None = None,
|
||||
new_description: str | None = None,
|
||||
new_state_name: str | None = None,
|
||||
new_assignee_email: str | None = None,
|
||||
new_priority: int | None = None,
|
||||
new_label_names: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Update an existing Linear issue that has been indexed in the knowledge base.
|
||||
|
||||
Use this tool when the user asks to modify, change, or update a Linear issue —
|
||||
for example, changing its status, reassigning it, updating its title or description,
|
||||
adjusting its priority, or changing its labels.
|
||||
|
||||
Only issues already indexed in the knowledge base can be updated.
|
||||
|
||||
Args:
|
||||
issue_ref: The issue to update. Can be the issue title (e.g. "Fix login bug"),
|
||||
the identifier (e.g. "ENG-42"), or the full document title
|
||||
(e.g. "ENG-42: Fix login bug"). Matched case-insensitively.
|
||||
new_title: New title for the issue (optional).
|
||||
new_description: New markdown body for the issue (optional).
|
||||
new_state_name: New workflow state name (e.g. "In Progress", "Done").
|
||||
Matched case-insensitively against the team's states.
|
||||
new_assignee_email: Email address of the new assignee.
|
||||
Matched case-insensitively against the team's members.
|
||||
new_priority: New priority (0 = No Priority, 1 = Urgent, 2 = High,
|
||||
3 = Medium, 4 = Low).
|
||||
new_label_names: New set of label names to apply.
|
||||
Matched case-insensitively against the team's labels.
|
||||
Unrecognised names are silently skipped.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", "not_found", or "error"
|
||||
- identifier: Human-readable ID like "ENG-42" (if success)
|
||||
- url: URL to the updated issue (if success)
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment (e.g., "Understood, I didn't update the issue.")
|
||||
and move on. Do NOT ask for alternatives or troubleshoot.
|
||||
- If status is "not_found", inform the user conversationally using the exact message
|
||||
provided. Do NOT treat this as an error. Simply relay the message and ask the user
|
||||
to verify the issue title or identifier, or check if it has been indexed.
|
||||
|
||||
Examples:
|
||||
- "Mark the 'Fix login bug' issue as done"
|
||||
- "Assign ENG-42 to john@company.com"
|
||||
- "Change the priority of 'Payment timeout' to urgent"
|
||||
"""
|
||||
logger.info(f"update_linear_issue called: issue_ref='{issue_ref}'")
|
||||
|
||||
if search_space_id is None or user_id is None:
|
||||
logger.error(
|
||||
"Linear tool not properly configured - missing required parameters"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Linear tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
metadata_service = LinearToolMetadataService(db_session)
|
||||
context = await metadata_service.get_update_context(
|
||||
search_space_id, user_id, issue_ref
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
error_msg = context["error"]
|
||||
if context.get("auth_expired"):
|
||||
logger.warning(f"Auth expired for update context: {error_msg}")
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": error_msg,
|
||||
"connector_id": context.get("connector_id"),
|
||||
"connector_type": "linear",
|
||||
}
|
||||
if "not found" in error_msg.lower():
|
||||
logger.warning(f"Issue not found: {error_msg}")
|
||||
return {"status": "not_found", "message": error_msg}
|
||||
else:
|
||||
logger.error(f"Failed to fetch update context: {error_msg}")
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
issue_id = context["issue"]["id"]
|
||||
document_id = context["issue"]["document_id"]
|
||||
connector_id_from_context = context.get("workspace", {}).get("id")
|
||||
|
||||
team = context.get("team", {})
|
||||
new_state_id = _resolve_state(team, new_state_name)
|
||||
new_assignee_id = _resolve_assignee(team, new_assignee_email)
|
||||
new_label_ids = _resolve_labels(team, new_label_names)
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for updating Linear issue: '{issue_ref}' (id={issue_id})"
|
||||
)
|
||||
result = request_approval(
|
||||
action_type="linear_issue_update",
|
||||
tool_name="update_linear_issue",
|
||||
params={
|
||||
"issue_id": issue_id,
|
||||
"document_id": document_id,
|
||||
"new_title": new_title,
|
||||
"new_description": new_description,
|
||||
"new_state_id": new_state_id,
|
||||
"new_assignee_id": new_assignee_id,
|
||||
"new_priority": new_priority,
|
||||
"new_label_ids": new_label_ids,
|
||||
"connector_id": connector_id_from_context,
|
||||
},
|
||||
context=context,
|
||||
)
|
||||
|
||||
if result.rejected:
|
||||
logger.info("Linear issue update rejected by user")
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. Do not retry or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_issue_id = result.params.get("issue_id", issue_id)
|
||||
final_document_id = result.params.get("document_id", document_id)
|
||||
final_new_title = result.params.get("new_title", new_title)
|
||||
final_new_description = result.params.get(
|
||||
"new_description", new_description
|
||||
)
|
||||
final_new_state_id = result.params.get("new_state_id", new_state_id)
|
||||
final_new_assignee_id = result.params.get(
|
||||
"new_assignee_id", new_assignee_id
|
||||
)
|
||||
final_new_priority = result.params.get("new_priority", new_priority)
|
||||
final_new_label_ids: list[str] | None = result.params.get(
|
||||
"new_label_ids", new_label_ids
|
||||
)
|
||||
final_connector_id = result.params.get(
|
||||
"connector_id", connector_id_from_context
|
||||
)
|
||||
|
||||
if not final_connector_id:
|
||||
logger.error("No connector found for this issue")
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No connector found for this issue.",
|
||||
}
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.LINEAR_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
logger.error(
|
||||
f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Linear connector is invalid or has been disconnected.",
|
||||
}
|
||||
logger.info(f"Validated Linear connector: id={final_connector_id}")
|
||||
|
||||
logger.info(
|
||||
f"Updating Linear issue with final params: issue_id={final_issue_id}"
|
||||
)
|
||||
linear_client = LinearConnector(
|
||||
session=db_session, connector_id=final_connector_id
|
||||
)
|
||||
updated_issue = await linear_client.update_issue(
|
||||
issue_id=final_issue_id,
|
||||
title=final_new_title,
|
||||
description=final_new_description,
|
||||
state_id=final_new_state_id,
|
||||
assignee_id=final_new_assignee_id,
|
||||
priority=final_new_priority,
|
||||
label_ids=final_new_label_ids,
|
||||
)
|
||||
|
||||
if updated_issue.get("status") == "error":
|
||||
logger.error(
|
||||
f"Failed to update Linear issue: {updated_issue.get('message')}"
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": updated_issue.get("message"),
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"update_issue result: {updated_issue.get('identifier')} - {updated_issue.get('title')}"
|
||||
)
|
||||
|
||||
if final_document_id is not None:
|
||||
logger.info(
|
||||
f"Updating knowledge base for document {final_document_id}..."
|
||||
)
|
||||
kb_service = LinearKBSyncService(db_session)
|
||||
kb_result = await kb_service.sync_after_update(
|
||||
document_id=final_document_id,
|
||||
issue_id=final_issue_id,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
if kb_result["status"] == "success":
|
||||
logger.info(
|
||||
f"Knowledge base successfully updated for issue {final_issue_id}"
|
||||
)
|
||||
kb_message = " Your knowledge base has also been updated."
|
||||
elif kb_result["status"] == "not_indexed":
|
||||
kb_message = " This issue will be added to your knowledge base in the next scheduled sync."
|
||||
else:
|
||||
logger.warning(
|
||||
f"KB update failed for issue {final_issue_id}: {kb_result.get('message')}"
|
||||
)
|
||||
kb_message = " Your knowledge base will be updated in the next scheduled sync."
|
||||
else:
|
||||
kb_message = ""
|
||||
|
||||
identifier = updated_issue.get("identifier")
|
||||
default_msg = f"Issue {identifier} updated successfully."
|
||||
return {
|
||||
"status": "success",
|
||||
"identifier": identifier,
|
||||
"url": updated_issue.get("url"),
|
||||
"message": f"{updated_issue.get('message', default_msg)}{kb_message}",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error updating Linear issue: {e}", exc_info=True)
|
||||
if isinstance(e, ValueError | LinearAPIError):
|
||||
message = str(e)
|
||||
else:
|
||||
message = (
|
||||
"Something went wrong while updating the issue. Please try again."
|
||||
)
|
||||
return {"status": "error", "message": message}
|
||||
|
||||
return update_linear_issue
|
||||
|
||||
|
||||
def _resolve_state(team: dict, state_name: str | None) -> str | None:
|
||||
if not state_name:
|
||||
return None
|
||||
name_lower = state_name.lower()
|
||||
for state in team.get("states", []):
|
||||
if state.get("name", "").lower() == name_lower:
|
||||
return state["id"]
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_assignee(team: dict, assignee_email: str | None) -> str | None:
|
||||
if not assignee_email:
|
||||
return None
|
||||
email_lower = assignee_email.lower()
|
||||
for member in team.get("members", []):
|
||||
if member.get("email", "").lower() == email_lower:
|
||||
return member["id"]
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_labels(team: dict, label_names: list[str] | None) -> list[str] | None:
|
||||
if label_names is None:
|
||||
return None
|
||||
if not label_names:
|
||||
return []
|
||||
name_set = {n.lower() for n in label_names}
|
||||
return [
|
||||
label["id"]
|
||||
for label in team.get("labels", [])
|
||||
if label.get("name", "").lower() in name_set
|
||||
]
|
||||
|
|
@ -1,38 +0,0 @@
|
|||
"""Standardised response dict factories for LangChain agent tools."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ToolResponse:
|
||||
@staticmethod
|
||||
def success(message: str, **data: Any) -> dict[str, Any]:
|
||||
return {"status": "success", "message": message, **data}
|
||||
|
||||
@staticmethod
|
||||
def error(error: str, **data: Any) -> dict[str, Any]:
|
||||
return {"status": "error", "error": error, **data}
|
||||
|
||||
@staticmethod
|
||||
def auth_error(service: str, **data: Any) -> dict[str, Any]:
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"error": (
|
||||
f"{service} authentication has expired or been revoked. "
|
||||
"Please re-connect the integration in Settings → Connectors."
|
||||
),
|
||||
**data,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def rejected(message: str = "Action was declined by the user.") -> dict[str, Any]:
|
||||
return {"status": "rejected", "message": message}
|
||||
|
||||
@staticmethod
|
||||
def not_found(resource: str, identifier: str, **data: Any) -> dict[str, Any]:
|
||||
return {
|
||||
"status": "not_found",
|
||||
"error": f"{resource} '{identifier}' was not found.",
|
||||
**data,
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue