Add builtin research route slice for delegated agents.

This commit is contained in:
CREDO23 2026-05-01 20:30:20 +02:00
parent 7080b787d1
commit b9bc06e7b4
10 changed files with 832 additions and 0 deletions

View file

@ -0,0 +1,54 @@
"""`research` route: ``SubAgent`` spec for deepagents."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from deepagents import SubAgent
from langchain_core.language_models import BaseChatModel
from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import (
read_md_file,
)
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
ToolsPermissions,
merge_tools_permissions,
)
from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import (
pack_subagent,
)
from .tools.index import load_tools
NAME = "research"
def build_subagent(
*,
dependencies: dict[str, Any],
model: BaseChatModel | None = None,
extra_middleware: Sequence[Any] | None = None,
extra_tools_bucket: ToolsPermissions | None = None,
) -> SubAgent:
buckets = load_tools(dependencies=dependencies)
merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket)
tools = [
row["tool"]
for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"])
if row.get("tool") is not None
]
interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")}
description = read_md_file(__package__, "description").strip()
if not description:
description = "Handles research tasks for this workspace."
system_prompt = read_md_file(__package__, "system_prompt").strip()
return pack_subagent(
name=NAME,
description=description,
system_prompt=system_prompt,
tools=tools,
interrupt_on=interrupt_on,
model=model,
extra_middleware=extra_middleware,
)

View file

@ -0,0 +1 @@
Use for external research: find sources on the web, extract evidence, and answer documentation questions.

View file

@ -0,0 +1,53 @@
You are the SurfSense research operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Gather and synthesize evidence using SurfSense research tools with clear citations and uncertainty reporting.
</goal>
<available_tools>
- `web_search`
- `scrape_webpage`
- `search_surfsense_docs`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Prefer primary and recent sources when recency matters.
- If the delegated request is underspecified, return `status=blocked` with the missing research constraints.
- Never fabricate facts, citations, URLs, or quote text.
</tool_policy>
<out_of_scope>
- Do not execute connector mutations (email/calendar/docs/chat writes) or deliverable generation.
</out_of_scope>
<safety>
- Report uncertainty explicitly when evidence is incomplete or conflicting.
- Never present unverified claims as facts.
</safety>
<failure_policy>
- On tool failure, return `status=error` with a concise recovery `next_step`.
- On no useful evidence, return `status=blocked` with recommended narrower filters.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"findings": string[],
"sources": string[],
"confidence": "high" | "medium" | "low"
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -0,0 +1,11 @@
"""Research-stage tools: web search, scrape, and in-product doc search."""
from .scrape_webpage import create_scrape_webpage_tool
from .search_surfsense_docs import create_search_surfsense_docs_tool
from .web_search import create_web_search_tool
__all__ = [
"create_scrape_webpage_tool",
"create_search_surfsense_docs_tool",
"create_web_search_tool",
]

View file

@ -0,0 +1,29 @@
from __future__ import annotations
from typing import Any
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
ToolsPermissions,
)
from .scrape_webpage import create_scrape_webpage_tool
from .search_surfsense_docs import create_search_surfsense_docs_tool
from .web_search import create_web_search_tool
def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions:
resolved_dependencies = {**(dependencies or {}), **kwargs}
web = create_web_search_tool(
search_space_id=resolved_dependencies.get("search_space_id"),
available_connectors=resolved_dependencies.get("available_connectors"),
)
scrape = create_scrape_webpage_tool(firecrawl_api_key=resolved_dependencies.get("firecrawl_api_key"))
docs = create_search_surfsense_docs_tool(db_session=resolved_dependencies["db_session"])
return {
"allow": [
{"name": getattr(web, "name", "") or "", "tool": web},
{"name": getattr(scrape, "name", "") or "", "tool": scrape},
{"name": getattr(docs, "name", "") or "", "tool": docs},
],
"ask": [],
}

View file

@ -0,0 +1,300 @@
"""Scrape pages via WebCrawlerConnector; YouTube URLs use the transcript API instead of HTML crawl."""
import hashlib
import logging
from typing import Any
from urllib.parse import urlparse
import aiohttp
from fake_useragent import UserAgent
from langchain_core.tools import tool
from requests import Session
from youtube_transcript_api import YouTubeTranscriptApi
from app.connectors.webcrawler_connector import WebCrawlerConnector
from app.tasks.document_processors.youtube_processor import get_youtube_video_id
from app.utils.proxy_config import get_requests_proxies
logger = logging.getLogger(__name__)
def extract_domain(url: str) -> str:
"""Extract the domain from a URL."""
try:
parsed = urlparse(url)
domain = parsed.netloc
# Remove 'www.' prefix if present
if domain.startswith("www."):
domain = domain[4:]
return domain
except Exception:
return ""
def generate_scrape_id(url: str) -> str:
"""Generate a unique ID for a scraped webpage."""
hash_val = hashlib.md5(url.encode()).hexdigest()[:12]
return f"scrape-{hash_val}"
def truncate_content(content: str, max_length: int = 50000) -> tuple[str, bool]:
"""
Truncate content to a maximum length.
Returns:
Tuple of (truncated_content, was_truncated)
"""
if len(content) <= max_length:
return content, False
# Try to truncate at a sentence boundary
truncated = content[:max_length]
last_period = truncated.rfind(".")
last_newline = truncated.rfind("\n\n")
# Use the later of the two boundaries, or just truncate
boundary = max(last_period, last_newline)
if boundary > max_length * 0.8: # Only use boundary if it's not too far back
truncated = content[: boundary + 1]
return truncated + "\n\n[Content truncated...]", True
async def _scrape_youtube_video(
url: str, video_id: str, max_length: int
) -> dict[str, Any]:
"""
Fetch YouTube video metadata and transcript via the YouTubeTranscriptApi.
Returns a result dict in the same shape as the regular scrape_webpage output.
"""
scrape_id = generate_scrape_id(url)
domain = "youtube.com"
# --- Video metadata via oEmbed ---
residential_proxies = get_requests_proxies()
params = {
"format": "json",
"url": f"https://www.youtube.com/watch?v={video_id}",
}
oembed_url = "https://www.youtube.com/oembed"
try:
async with (
aiohttp.ClientSession() as http_session,
http_session.get(
oembed_url,
params=params,
proxy=residential_proxies["http"] if residential_proxies else None,
) as response,
):
video_data = await response.json()
except Exception:
video_data = {}
title = video_data.get("title", "YouTube Video")
author = video_data.get("author_name", "Unknown")
# --- Transcript via YouTubeTranscriptApi ---
try:
ua = UserAgent()
http_client = Session()
http_client.headers.update({"User-Agent": ua.random})
if residential_proxies:
http_client.proxies.update(residential_proxies)
ytt_api = YouTubeTranscriptApi(http_client=http_client)
# List all available transcripts and pick the first one
# (the video's primary language) instead of defaulting to English
transcript_list = ytt_api.list(video_id)
transcript = next(iter(transcript_list))
captions = transcript.fetch()
logger.info(
f"[scrape_webpage] Fetched transcript for {video_id} "
f"in {transcript.language} ({transcript.language_code})"
)
transcript_segments = []
for line in captions:
start_time = line.start
duration = line.duration
text = line.text
timestamp = f"[{start_time:.2f}s-{start_time + duration:.2f}s]"
transcript_segments.append(f"{timestamp} {text}")
transcript_text = "\n".join(transcript_segments)
except Exception as e:
logger.warning(f"[scrape_webpage] No transcript for video {video_id}: {e}")
transcript_text = f"No captions available for this video. Error: {e!s}"
# Build combined content
content = f"# {title}\n\n**Author:** {author}\n**Video ID:** {video_id}\n\n## Transcript\n\n{transcript_text}"
# Truncate if needed
content, was_truncated = truncate_content(content, max_length)
word_count = len(content.split())
description = f"YouTube video by {author}"
return {
"id": scrape_id,
"assetId": url,
"kind": "article",
"href": url,
"title": title,
"description": description,
"content": content,
"domain": domain,
"word_count": word_count,
"was_truncated": was_truncated,
"crawler_type": "youtube_transcript",
"author": author,
}
def create_scrape_webpage_tool(firecrawl_api_key: str | None = None):
"""
Factory function to create the scrape_webpage tool.
Args:
firecrawl_api_key: Optional Firecrawl API key for premium web scraping.
Falls back to Chromium/Trafilatura if not provided.
Returns:
A configured tool function for scraping webpages.
"""
@tool
async def scrape_webpage(
url: str,
max_length: int = 50000,
) -> dict[str, Any]:
"""
Scrape and extract the main content from a webpage.
Use this tool when the user wants you to read, summarize, or answer
questions about a specific webpage's content. This tool actually
fetches and reads the full page content. For YouTube video URLs it
fetches the transcript directly instead of crawling the page.
Common triggers:
- "Read this article and summarize it"
- "What does this page say about X?"
- "Summarize this blog post for me"
- "Tell me the key points from this article"
- "What's in this webpage?"
Args:
url: The URL of the webpage to scrape (must be HTTP/HTTPS)
max_length: Maximum content length to return (default: 50000 chars)
Returns:
A dictionary containing:
- id: Unique identifier for this scrape
- assetId: The URL (for deduplication)
- kind: "article" (type of content)
- href: The URL to open when clicked
- title: Page title
- description: Brief description or excerpt
- content: The extracted main content (markdown format)
- domain: The domain name
- word_count: Approximate word count
- was_truncated: Whether content was truncated
- error: Error message (if scraping failed)
"""
scrape_id = generate_scrape_id(url)
domain = extract_domain(url)
# Validate and normalize URL
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
try:
# Check if this is a YouTube URL and use transcript API instead
video_id = get_youtube_video_id(url)
if video_id:
return await _scrape_youtube_video(url, video_id, max_length)
# Create webcrawler connector
connector = WebCrawlerConnector(firecrawl_api_key=firecrawl_api_key)
# Crawl the URL
result, error = await connector.crawl_url(url, formats=["markdown"])
if error:
return {
"id": scrape_id,
"assetId": url,
"kind": "article",
"href": url,
"title": domain or "Webpage",
"domain": domain,
"error": error,
}
if not result:
return {
"id": scrape_id,
"assetId": url,
"kind": "article",
"href": url,
"title": domain or "Webpage",
"domain": domain,
"error": "No content returned from crawler",
}
# Extract content and metadata
content = result.get("content", "")
metadata = result.get("metadata", {})
# Get title from metadata
title = metadata.get("title", "")
if not title:
title = domain or url.split("/")[-1] or "Webpage"
# Get description from metadata
description = metadata.get("description", "")
if not description and content:
# Use first paragraph as description
first_para = content.split("\n\n")[0] if content else ""
description = (
first_para[:300] + "..." if len(first_para) > 300 else first_para
)
# Truncate content if needed
content, was_truncated = truncate_content(content, max_length)
# Calculate word count
word_count = len(content.split())
return {
"id": scrape_id,
"assetId": url,
"kind": "article",
"href": url,
"title": title,
"description": description,
"content": content,
"domain": domain,
"word_count": word_count,
"was_truncated": was_truncated,
"crawler_type": result.get("crawler_type", "unknown"),
"author": metadata.get("author"),
"date": metadata.get("date"),
}
except Exception as e:
error_message = str(e)
logger.error(f"[scrape_webpage] Error scraping {url}: {error_message}")
return {
"id": scrape_id,
"assetId": url,
"kind": "article",
"href": url,
"title": domain or "Webpage",
"domain": domain,
"error": f"Failed to scrape: {error_message[:100]}",
}
return scrape_webpage

View file

@ -0,0 +1,143 @@
"""Semantic search over pre-indexed in-app documentation chunks for user how-to questions."""
import asyncio
import json
from langchain_core.tools import tool
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SurfsenseDocsChunk, SurfsenseDocsDocument
from app.utils.document_converters import embed_text
def format_surfsense_docs_results(results: list[tuple]) -> str:
"""Format (chunk, document) rows as XML with ``doc-`` chunk IDs for citations and UI routing."""
if not results:
return "No relevant Surfsense documentation found for your query."
# Group chunks by document
grouped: dict[int, dict] = {}
for chunk, doc in results:
if doc.id not in grouped:
grouped[doc.id] = {
"document_id": f"doc-{doc.id}",
"document_type": "SURFSENSE_DOCS",
"title": doc.title,
"url": doc.source,
"metadata": {"source": doc.source},
"chunks": [],
}
grouped[doc.id]["chunks"].append(
{
"chunk_id": f"doc-{chunk.id}",
"content": chunk.content,
}
)
# Render XML matching format_documents_for_context structure
parts: list[str] = []
for g in grouped.values():
metadata_json = json.dumps(g["metadata"], ensure_ascii=False)
parts.append("<document>")
parts.append("<document_metadata>")
parts.append(f" <document_id>{g['document_id']}</document_id>")
parts.append(f" <document_type>{g['document_type']}</document_type>")
parts.append(f" <title><![CDATA[{g['title']}]]></title>")
parts.append(f" <url><![CDATA[{g['url']}]]></url>")
parts.append(f" <metadata_json><![CDATA[{metadata_json}]]></metadata_json>")
parts.append("</document_metadata>")
parts.append("")
parts.append("<document_content>")
for ch in g["chunks"]:
parts.append(
f" <chunk id='{ch['chunk_id']}'><![CDATA[{ch['content']}]]></chunk>"
)
parts.append("</document_content>")
parts.append("</document>")
parts.append("")
return "\n".join(parts).strip()
async def search_surfsense_docs_async(
query: str,
db_session: AsyncSession,
top_k: int = 10,
) -> str:
"""
Search Surfsense documentation using vector similarity.
Args:
query: The search query about Surfsense usage
db_session: Database session for executing queries
top_k: Number of results to return
Returns:
Formatted string with relevant documentation content
"""
# Get embedding for the query
query_embedding = await asyncio.to_thread(embed_text, query)
# Vector similarity search on chunks, joining with documents
stmt = (
select(SurfsenseDocsChunk, SurfsenseDocsDocument)
.join(
SurfsenseDocsDocument,
SurfsenseDocsChunk.document_id == SurfsenseDocsDocument.id,
)
.order_by(SurfsenseDocsChunk.embedding.op("<=>")(query_embedding))
.limit(top_k)
)
result = await db_session.execute(stmt)
rows = result.all()
return format_surfsense_docs_results(rows)
def create_search_surfsense_docs_tool(db_session: AsyncSession):
"""
Factory function to create the search_surfsense_docs tool.
Args:
db_session: Database session for executing queries
Returns:
A configured tool function for searching Surfsense documentation
"""
@tool
async def search_surfsense_docs(query: str, top_k: int = 10) -> str:
"""
Search Surfsense documentation for help with using the application.
Use this tool when the user asks questions about:
- How to use Surfsense features
- Installation and setup instructions
- Configuration options and settings
- Troubleshooting common issues
- Available connectors and integrations
- Browser extension usage
- API documentation
This searches the official Surfsense documentation that was indexed
at deployment time. It does NOT search the user's personal knowledge base.
Args:
query: The search query about Surfsense usage or features
top_k: Number of documentation chunks to retrieve (default: 10)
Returns:
Relevant documentation content formatted with chunk IDs for citations
"""
return await search_surfsense_docs_async(
query=query,
db_session=db_session,
top_k=top_k,
)
return search_surfsense_docs

View file

@ -0,0 +1,241 @@
"""Real-time web search: SearXNG plus configured live-search connectors (Tavily, Linkup, Baidu, etc.)."""
import asyncio
import json
import time
from typing import Any
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from app.db import shielded_async_session
from app.services.connector_service import ConnectorService
from app.utils.perf import get_perf_logger
_LIVE_SEARCH_CONNECTORS: set[str] = {
"TAVILY_API",
"LINKUP_API",
"BAIDU_SEARCH_API",
}
_LIVE_CONNECTOR_SPECS: dict[str, tuple[str, bool, bool, dict[str, Any]]] = {
"TAVILY_API": ("search_tavily", False, True, {}),
"LINKUP_API": ("search_linkup", False, False, {"mode": "standard"}),
"BAIDU_SEARCH_API": ("search_baidu", False, True, {}),
}
_CONNECTOR_LABELS: dict[str, str] = {
"TAVILY_API": "Tavily",
"LINKUP_API": "Linkup",
"BAIDU_SEARCH_API": "Baidu",
}
class WebSearchInput(BaseModel):
"""Input schema for the web_search tool."""
query: str = Field(
description="The search query to look up on the web. Use specific, descriptive terms.",
)
top_k: int = Field(
default=10,
description="Number of results to retrieve (default: 10, max: 50).",
)
def _format_web_results(
documents: list[dict[str, Any]],
*,
max_chars: int = 50_000,
) -> str:
"""Format web search results into XML suitable for the LLM context."""
if not documents:
return "No web search results found."
parts: list[str] = []
total_chars = 0
for doc in documents:
doc_info = doc.get("document") or {}
metadata = doc_info.get("metadata") or {}
title = doc_info.get("title") or "Web Result"
url = metadata.get("url") or ""
content = (doc.get("content") or "").strip()
source = metadata.get("document_type") or doc.get("source") or "WEB_SEARCH"
if not content:
continue
metadata_json = json.dumps(metadata, ensure_ascii=False)
doc_xml = "\n".join(
[
"<document>",
"<document_metadata>",
f" <document_type>{source}</document_type>",
f" <title><![CDATA[{title}]]></title>",
f" <url><![CDATA[{url}]]></url>",
f" <metadata_json><![CDATA[{metadata_json}]]></metadata_json>",
"</document_metadata>",
"<document_content>",
f" <chunk id='{url}'><![CDATA[{content}]]></chunk>",
"</document_content>",
"</document>",
"",
]
)
if total_chars + len(doc_xml) > max_chars:
parts.append("<!-- Output truncated to fit context window -->")
break
parts.append(doc_xml)
total_chars += len(doc_xml)
return "\n".join(parts).strip() or "No web search results found."
async def _search_live_connector(
connector: str,
query: str,
search_space_id: int,
top_k: int,
semaphore: asyncio.Semaphore,
) -> list[dict[str, Any]]:
"""Dispatch a single live-search connector (Tavily / Linkup / Baidu)."""
perf = get_perf_logger()
spec = _LIVE_CONNECTOR_SPECS.get(connector)
if spec is None:
return []
method_name, _includes_date_range, includes_top_k, extra_kwargs = spec
kwargs: dict[str, Any] = {
"user_query": query,
"search_space_id": search_space_id,
**extra_kwargs,
}
if includes_top_k:
kwargs["top_k"] = top_k
try:
t0 = time.perf_counter()
async with semaphore, shielded_async_session() as session:
svc = ConnectorService(session, search_space_id)
_, chunks = await getattr(svc, method_name)(**kwargs)
perf.info(
"[web_search] connector=%s results=%d in %.3fs",
connector,
len(chunks),
time.perf_counter() - t0,
)
return chunks
except Exception as e:
perf.warning("[web_search] connector=%s FAILED: %s", connector, e)
return []
def create_web_search_tool(
search_space_id: int | None = None,
available_connectors: list[str] | None = None,
) -> StructuredTool:
"""Factory for the ``web_search`` tool.
Dispatches in parallel to the platform SearXNG instance and any
user-configured live-search connectors (Tavily, Linkup, Baidu).
"""
active_live_connectors: list[str] = []
if available_connectors:
active_live_connectors = [
c for c in available_connectors if c in _LIVE_SEARCH_CONNECTORS
]
engine_names = ["SearXNG (platform default)"]
engine_names.extend(_CONNECTOR_LABELS.get(c, c) for c in active_live_connectors)
engines_summary = ", ".join(engine_names)
description = (
"Search the web for real-time information. "
"Use this for current events, news, prices, weather, public facts, or any "
"question that requires up-to-date information from the internet.\n\n"
f"Active search engines: {engines_summary}.\n"
"All configured engines are queried in parallel and results are merged."
)
_search_space_id = search_space_id
_active_live = active_live_connectors
async def _web_search_impl(query: str, top_k: int = 10) -> str:
from app.services import web_search_service
perf = get_perf_logger()
t0 = time.perf_counter()
clamped_top_k = min(max(1, top_k), 50)
semaphore = asyncio.Semaphore(4)
tasks: list[asyncio.Task[list[dict[str, Any]]]] = []
if web_search_service.is_available():
async def _searxng() -> list[dict[str, Any]]:
async with semaphore:
_result_obj, docs = await web_search_service.search(
query=query,
top_k=clamped_top_k,
)
return docs
tasks.append(asyncio.ensure_future(_searxng()))
if _search_space_id is not None:
for connector in _active_live:
tasks.append(
asyncio.ensure_future(
_search_live_connector(
connector=connector,
query=query,
search_space_id=_search_space_id,
top_k=clamped_top_k,
semaphore=semaphore,
)
)
)
if not tasks:
return "Web search is not available — no search engines are configured."
results_lists = await asyncio.gather(*tasks, return_exceptions=True)
all_documents: list[dict[str, Any]] = []
for result in results_lists:
if isinstance(result, BaseException):
perf.warning("[web_search] a search engine failed: %s", result)
continue
all_documents.extend(result)
seen_urls: set[str] = set()
deduplicated: list[dict[str, Any]] = []
for doc in all_documents:
url = ((doc.get("document") or {}).get("metadata") or {}).get("url", "")
if url and url in seen_urls:
continue
if url:
seen_urls.add(url)
deduplicated.append(doc)
formatted = _format_web_results(deduplicated)
perf.info(
"[web_search] query=%r engines=%d results=%d deduped=%d chars=%d in %.3fs",
query[:60],
len(tasks),
len(all_documents),
len(deduplicated),
len(formatted),
time.perf_counter() - t0,
)
return formatted
return StructuredTool(
name="web_search",
description=description,
coroutine=_web_search_impl,
args_schema=WebSearchInput,
)