diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/agent.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/agent.py new file mode 100644 index 000000000..c04330607 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/agent.py @@ -0,0 +1,54 @@ +"""`research` route: ``SubAgent`` spec for deepagents.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from deepagents import SubAgent +from langchain_core.language_models import BaseChatModel + +from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import ( + read_md_file, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, + merge_tools_permissions, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import ( + pack_subagent, +) + +from .tools.index import load_tools + +NAME = "research" + + +def build_subagent( + *, + dependencies: dict[str, Any], + model: BaseChatModel | None = None, + extra_middleware: Sequence[Any] | None = None, + extra_tools_bucket: ToolsPermissions | None = None, +) -> SubAgent: + buckets = load_tools(dependencies=dependencies) + merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket) + tools = [ + row["tool"] + for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"]) + if row.get("tool") is not None + ] + interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")} + description = read_md_file(__package__, "description").strip() + if not description: + description = "Handles research tasks for this workspace." + system_prompt = read_md_file(__package__, "system_prompt").strip() + return pack_subagent( + name=NAME, + description=description, + system_prompt=system_prompt, + tools=tools, + interrupt_on=interrupt_on, + model=model, + extra_middleware=extra_middleware, + ) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/description.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/description.md new file mode 100644 index 000000000..dd2ced3fb --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/description.md @@ -0,0 +1 @@ +Use for external research: find sources on the web, extract evidence, and answer documentation questions. diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/system_prompt.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/system_prompt.md new file mode 100644 index 000000000..cf558db62 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/system_prompt.md @@ -0,0 +1,53 @@ +You are the SurfSense research operations sub-agent. +You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis. + + +Gather and synthesize evidence using SurfSense research tools with clear citations and uncertainty reporting. + + + +- `web_search` +- `scrape_webpage` +- `search_surfsense_docs` + + + +- Use only tools in ``. +- Prefer primary and recent sources when recency matters. +- If the delegated request is underspecified, return `status=blocked` with the missing research constraints. +- Never fabricate facts, citations, URLs, or quote text. + + + +- Do not execute connector mutations (email/calendar/docs/chat writes) or deliverable generation. + + + +- Report uncertainty explicitly when evidence is incomplete or conflicting. +- Never present unverified claims as facts. + + + +- On tool failure, return `status=error` with a concise recovery `next_step`. +- On no useful evidence, return `status=blocked` with recommended narrower filters. + + + +Return **only** one JSON object (no markdown/prose): +{ + "status": "success" | "partial" | "blocked" | "error", + "action_summary": string, + "evidence": { + "findings": string[], + "sources": string[], + "confidence": "high" | "medium" | "low" + }, + "next_step": string | null, + "missing_fields": string[] | null, + "assumptions": string[] | null +} +Rules: +- `status=success` -> `next_step=null`, `missing_fields=null`. +- `status=partial|blocked|error` -> `next_step` must be non-null. +- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null. + diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/__init__.py new file mode 100644 index 000000000..414cc96f4 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/__init__.py @@ -0,0 +1,11 @@ +"""Research-stage tools: web search, scrape, and in-product doc search.""" + +from .scrape_webpage import create_scrape_webpage_tool +from .search_surfsense_docs import create_search_surfsense_docs_tool +from .web_search import create_web_search_tool + +__all__ = [ + "create_scrape_webpage_tool", + "create_search_surfsense_docs_tool", + "create_web_search_tool", +] diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/index.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/index.py new file mode 100644 index 000000000..a616ac2dc --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/index.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, +) + +from .scrape_webpage import create_scrape_webpage_tool +from .search_surfsense_docs import create_search_surfsense_docs_tool +from .web_search import create_web_search_tool + + +def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions: + resolved_dependencies = {**(dependencies or {}), **kwargs} + web = create_web_search_tool( + search_space_id=resolved_dependencies.get("search_space_id"), + available_connectors=resolved_dependencies.get("available_connectors"), + ) + scrape = create_scrape_webpage_tool(firecrawl_api_key=resolved_dependencies.get("firecrawl_api_key")) + docs = create_search_surfsense_docs_tool(db_session=resolved_dependencies["db_session"]) + return { + "allow": [ + {"name": getattr(web, "name", "") or "", "tool": web}, + {"name": getattr(scrape, "name", "") or "", "tool": scrape}, + {"name": getattr(docs, "name", "") or "", "tool": docs}, + ], + "ask": [], + } diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/scrape_webpage.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/scrape_webpage.py new file mode 100644 index 000000000..bb7c8e5a3 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/scrape_webpage.py @@ -0,0 +1,300 @@ +"""Scrape pages via WebCrawlerConnector; YouTube URLs use the transcript API instead of HTML crawl.""" + +import hashlib +import logging +from typing import Any +from urllib.parse import urlparse + +import aiohttp +from fake_useragent import UserAgent +from langchain_core.tools import tool +from requests import Session +from youtube_transcript_api import YouTubeTranscriptApi + +from app.connectors.webcrawler_connector import WebCrawlerConnector +from app.tasks.document_processors.youtube_processor import get_youtube_video_id +from app.utils.proxy_config import get_requests_proxies + +logger = logging.getLogger(__name__) + + +def extract_domain(url: str) -> str: + """Extract the domain from a URL.""" + try: + parsed = urlparse(url) + domain = parsed.netloc + # Remove 'www.' prefix if present + if domain.startswith("www."): + domain = domain[4:] + return domain + except Exception: + return "" + + +def generate_scrape_id(url: str) -> str: + """Generate a unique ID for a scraped webpage.""" + hash_val = hashlib.md5(url.encode()).hexdigest()[:12] + return f"scrape-{hash_val}" + + +def truncate_content(content: str, max_length: int = 50000) -> tuple[str, bool]: + """ + Truncate content to a maximum length. + + Returns: + Tuple of (truncated_content, was_truncated) + """ + if len(content) <= max_length: + return content, False + + # Try to truncate at a sentence boundary + truncated = content[:max_length] + last_period = truncated.rfind(".") + last_newline = truncated.rfind("\n\n") + + # Use the later of the two boundaries, or just truncate + boundary = max(last_period, last_newline) + if boundary > max_length * 0.8: # Only use boundary if it's not too far back + truncated = content[: boundary + 1] + + return truncated + "\n\n[Content truncated...]", True + + +async def _scrape_youtube_video( + url: str, video_id: str, max_length: int +) -> dict[str, Any]: + """ + Fetch YouTube video metadata and transcript via the YouTubeTranscriptApi. + + Returns a result dict in the same shape as the regular scrape_webpage output. + """ + scrape_id = generate_scrape_id(url) + domain = "youtube.com" + + # --- Video metadata via oEmbed --- + residential_proxies = get_requests_proxies() + + params = { + "format": "json", + "url": f"https://www.youtube.com/watch?v={video_id}", + } + oembed_url = "https://www.youtube.com/oembed" + + try: + async with ( + aiohttp.ClientSession() as http_session, + http_session.get( + oembed_url, + params=params, + proxy=residential_proxies["http"] if residential_proxies else None, + ) as response, + ): + video_data = await response.json() + except Exception: + video_data = {} + + title = video_data.get("title", "YouTube Video") + author = video_data.get("author_name", "Unknown") + + # --- Transcript via YouTubeTranscriptApi --- + try: + ua = UserAgent() + http_client = Session() + http_client.headers.update({"User-Agent": ua.random}) + if residential_proxies: + http_client.proxies.update(residential_proxies) + ytt_api = YouTubeTranscriptApi(http_client=http_client) + + # List all available transcripts and pick the first one + # (the video's primary language) instead of defaulting to English + transcript_list = ytt_api.list(video_id) + transcript = next(iter(transcript_list)) + captions = transcript.fetch() + + logger.info( + f"[scrape_webpage] Fetched transcript for {video_id} " + f"in {transcript.language} ({transcript.language_code})" + ) + + transcript_segments = [] + for line in captions: + start_time = line.start + duration = line.duration + text = line.text + timestamp = f"[{start_time:.2f}s-{start_time + duration:.2f}s]" + transcript_segments.append(f"{timestamp} {text}") + transcript_text = "\n".join(transcript_segments) + except Exception as e: + logger.warning(f"[scrape_webpage] No transcript for video {video_id}: {e}") + transcript_text = f"No captions available for this video. Error: {e!s}" + + # Build combined content + content = f"# {title}\n\n**Author:** {author}\n**Video ID:** {video_id}\n\n## Transcript\n\n{transcript_text}" + + # Truncate if needed + content, was_truncated = truncate_content(content, max_length) + word_count = len(content.split()) + + description = f"YouTube video by {author}" + + return { + "id": scrape_id, + "assetId": url, + "kind": "article", + "href": url, + "title": title, + "description": description, + "content": content, + "domain": domain, + "word_count": word_count, + "was_truncated": was_truncated, + "crawler_type": "youtube_transcript", + "author": author, + } + + +def create_scrape_webpage_tool(firecrawl_api_key: str | None = None): + """ + Factory function to create the scrape_webpage tool. + + Args: + firecrawl_api_key: Optional Firecrawl API key for premium web scraping. + Falls back to Chromium/Trafilatura if not provided. + + Returns: + A configured tool function for scraping webpages. + """ + + @tool + async def scrape_webpage( + url: str, + max_length: int = 50000, + ) -> dict[str, Any]: + """ + Scrape and extract the main content from a webpage. + + Use this tool when the user wants you to read, summarize, or answer + questions about a specific webpage's content. This tool actually + fetches and reads the full page content. For YouTube video URLs it + fetches the transcript directly instead of crawling the page. + + Common triggers: + - "Read this article and summarize it" + - "What does this page say about X?" + - "Summarize this blog post for me" + - "Tell me the key points from this article" + - "What's in this webpage?" + + Args: + url: The URL of the webpage to scrape (must be HTTP/HTTPS) + max_length: Maximum content length to return (default: 50000 chars) + + Returns: + A dictionary containing: + - id: Unique identifier for this scrape + - assetId: The URL (for deduplication) + - kind: "article" (type of content) + - href: The URL to open when clicked + - title: Page title + - description: Brief description or excerpt + - content: The extracted main content (markdown format) + - domain: The domain name + - word_count: Approximate word count + - was_truncated: Whether content was truncated + - error: Error message (if scraping failed) + """ + scrape_id = generate_scrape_id(url) + domain = extract_domain(url) + + # Validate and normalize URL + if not url.startswith(("http://", "https://")): + url = f"https://{url}" + + try: + # Check if this is a YouTube URL and use transcript API instead + video_id = get_youtube_video_id(url) + if video_id: + return await _scrape_youtube_video(url, video_id, max_length) + + # Create webcrawler connector + connector = WebCrawlerConnector(firecrawl_api_key=firecrawl_api_key) + + # Crawl the URL + result, error = await connector.crawl_url(url, formats=["markdown"]) + + if error: + return { + "id": scrape_id, + "assetId": url, + "kind": "article", + "href": url, + "title": domain or "Webpage", + "domain": domain, + "error": error, + } + + if not result: + return { + "id": scrape_id, + "assetId": url, + "kind": "article", + "href": url, + "title": domain or "Webpage", + "domain": domain, + "error": "No content returned from crawler", + } + + # Extract content and metadata + content = result.get("content", "") + metadata = result.get("metadata", {}) + + # Get title from metadata + title = metadata.get("title", "") + if not title: + title = domain or url.split("/")[-1] or "Webpage" + + # Get description from metadata + description = metadata.get("description", "") + if not description and content: + # Use first paragraph as description + first_para = content.split("\n\n")[0] if content else "" + description = ( + first_para[:300] + "..." if len(first_para) > 300 else first_para + ) + + # Truncate content if needed + content, was_truncated = truncate_content(content, max_length) + + # Calculate word count + word_count = len(content.split()) + + return { + "id": scrape_id, + "assetId": url, + "kind": "article", + "href": url, + "title": title, + "description": description, + "content": content, + "domain": domain, + "word_count": word_count, + "was_truncated": was_truncated, + "crawler_type": result.get("crawler_type", "unknown"), + "author": metadata.get("author"), + "date": metadata.get("date"), + } + + except Exception as e: + error_message = str(e) + logger.error(f"[scrape_webpage] Error scraping {url}: {error_message}") + return { + "id": scrape_id, + "assetId": url, + "kind": "article", + "href": url, + "title": domain or "Webpage", + "domain": domain, + "error": f"Failed to scrape: {error_message[:100]}", + } + + return scrape_webpage diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/search_surfsense_docs.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/search_surfsense_docs.py new file mode 100644 index 000000000..0d702be4c --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/search_surfsense_docs.py @@ -0,0 +1,143 @@ +"""Semantic search over pre-indexed in-app documentation chunks for user how-to questions.""" + +import asyncio +import json + +from langchain_core.tools import tool +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import SurfsenseDocsChunk, SurfsenseDocsDocument +from app.utils.document_converters import embed_text + + +def format_surfsense_docs_results(results: list[tuple]) -> str: + """Format (chunk, document) rows as XML with ``doc-`` chunk IDs for citations and UI routing.""" + if not results: + return "No relevant Surfsense documentation found for your query." + + # Group chunks by document + grouped: dict[int, dict] = {} + for chunk, doc in results: + if doc.id not in grouped: + grouped[doc.id] = { + "document_id": f"doc-{doc.id}", + "document_type": "SURFSENSE_DOCS", + "title": doc.title, + "url": doc.source, + "metadata": {"source": doc.source}, + "chunks": [], + } + grouped[doc.id]["chunks"].append( + { + "chunk_id": f"doc-{chunk.id}", + "content": chunk.content, + } + ) + + # Render XML matching format_documents_for_context structure + parts: list[str] = [] + for g in grouped.values(): + metadata_json = json.dumps(g["metadata"], ensure_ascii=False) + + parts.append("") + parts.append("") + parts.append(f" {g['document_id']}") + parts.append(f" {g['document_type']}") + parts.append(f" <![CDATA[{g['title']}]]>") + parts.append(f" ") + parts.append(f" ") + parts.append("") + parts.append("") + parts.append("") + + for ch in g["chunks"]: + parts.append( + f" " + ) + + parts.append("") + parts.append("") + parts.append("") + + return "\n".join(parts).strip() + + +async def search_surfsense_docs_async( + query: str, + db_session: AsyncSession, + top_k: int = 10, +) -> str: + """ + Search Surfsense documentation using vector similarity. + + Args: + query: The search query about Surfsense usage + db_session: Database session for executing queries + top_k: Number of results to return + + Returns: + Formatted string with relevant documentation content + """ + # Get embedding for the query + query_embedding = await asyncio.to_thread(embed_text, query) + + # Vector similarity search on chunks, joining with documents + stmt = ( + select(SurfsenseDocsChunk, SurfsenseDocsDocument) + .join( + SurfsenseDocsDocument, + SurfsenseDocsChunk.document_id == SurfsenseDocsDocument.id, + ) + .order_by(SurfsenseDocsChunk.embedding.op("<=>")(query_embedding)) + .limit(top_k) + ) + + result = await db_session.execute(stmt) + rows = result.all() + + return format_surfsense_docs_results(rows) + + +def create_search_surfsense_docs_tool(db_session: AsyncSession): + """ + Factory function to create the search_surfsense_docs tool. + + Args: + db_session: Database session for executing queries + + Returns: + A configured tool function for searching Surfsense documentation + """ + + @tool + async def search_surfsense_docs(query: str, top_k: int = 10) -> str: + """ + Search Surfsense documentation for help with using the application. + + Use this tool when the user asks questions about: + - How to use Surfsense features + - Installation and setup instructions + - Configuration options and settings + - Troubleshooting common issues + - Available connectors and integrations + - Browser extension usage + - API documentation + + This searches the official Surfsense documentation that was indexed + at deployment time. It does NOT search the user's personal knowledge base. + + Args: + query: The search query about Surfsense usage or features + top_k: Number of documentation chunks to retrieve (default: 10) + + Returns: + Relevant documentation content formatted with chunk IDs for citations + """ + return await search_surfsense_docs_async( + query=query, + db_session=db_session, + top_k=top_k, + ) + + return search_surfsense_docs diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/web_search.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/web_search.py new file mode 100644 index 000000000..2fe6bd378 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/builtins/research/tools/web_search.py @@ -0,0 +1,241 @@ +"""Real-time web search: SearXNG plus configured live-search connectors (Tavily, Linkup, Baidu, etc.).""" + +import asyncio +import json +import time +from typing import Any + +from langchain_core.tools import StructuredTool +from pydantic import BaseModel, Field + +from app.db import shielded_async_session +from app.services.connector_service import ConnectorService +from app.utils.perf import get_perf_logger + +_LIVE_SEARCH_CONNECTORS: set[str] = { + "TAVILY_API", + "LINKUP_API", + "BAIDU_SEARCH_API", +} + +_LIVE_CONNECTOR_SPECS: dict[str, tuple[str, bool, bool, dict[str, Any]]] = { + "TAVILY_API": ("search_tavily", False, True, {}), + "LINKUP_API": ("search_linkup", False, False, {"mode": "standard"}), + "BAIDU_SEARCH_API": ("search_baidu", False, True, {}), +} + +_CONNECTOR_LABELS: dict[str, str] = { + "TAVILY_API": "Tavily", + "LINKUP_API": "Linkup", + "BAIDU_SEARCH_API": "Baidu", +} + + +class WebSearchInput(BaseModel): + """Input schema for the web_search tool.""" + + query: str = Field( + description="The search query to look up on the web. Use specific, descriptive terms.", + ) + top_k: int = Field( + default=10, + description="Number of results to retrieve (default: 10, max: 50).", + ) + + +def _format_web_results( + documents: list[dict[str, Any]], + *, + max_chars: int = 50_000, +) -> str: + """Format web search results into XML suitable for the LLM context.""" + if not documents: + return "No web search results found." + + parts: list[str] = [] + total_chars = 0 + + for doc in documents: + doc_info = doc.get("document") or {} + metadata = doc_info.get("metadata") or {} + title = doc_info.get("title") or "Web Result" + url = metadata.get("url") or "" + content = (doc.get("content") or "").strip() + source = metadata.get("document_type") or doc.get("source") or "WEB_SEARCH" + if not content: + continue + + metadata_json = json.dumps(metadata, ensure_ascii=False) + doc_xml = "\n".join( + [ + "", + "", + f" {source}", + f" <![CDATA[{title}]]>", + f" ", + f" ", + "", + "", + f" ", + "", + "", + "", + ] + ) + + if total_chars + len(doc_xml) > max_chars: + parts.append("") + break + + parts.append(doc_xml) + total_chars += len(doc_xml) + + return "\n".join(parts).strip() or "No web search results found." + + +async def _search_live_connector( + connector: str, + query: str, + search_space_id: int, + top_k: int, + semaphore: asyncio.Semaphore, +) -> list[dict[str, Any]]: + """Dispatch a single live-search connector (Tavily / Linkup / Baidu).""" + perf = get_perf_logger() + spec = _LIVE_CONNECTOR_SPECS.get(connector) + if spec is None: + return [] + + method_name, _includes_date_range, includes_top_k, extra_kwargs = spec + kwargs: dict[str, Any] = { + "user_query": query, + "search_space_id": search_space_id, + **extra_kwargs, + } + if includes_top_k: + kwargs["top_k"] = top_k + + try: + t0 = time.perf_counter() + async with semaphore, shielded_async_session() as session: + svc = ConnectorService(session, search_space_id) + _, chunks = await getattr(svc, method_name)(**kwargs) + perf.info( + "[web_search] connector=%s results=%d in %.3fs", + connector, + len(chunks), + time.perf_counter() - t0, + ) + return chunks + except Exception as e: + perf.warning("[web_search] connector=%s FAILED: %s", connector, e) + return [] + + +def create_web_search_tool( + search_space_id: int | None = None, + available_connectors: list[str] | None = None, +) -> StructuredTool: + """Factory for the ``web_search`` tool. + + Dispatches in parallel to the platform SearXNG instance and any + user-configured live-search connectors (Tavily, Linkup, Baidu). + """ + active_live_connectors: list[str] = [] + if available_connectors: + active_live_connectors = [ + c for c in available_connectors if c in _LIVE_SEARCH_CONNECTORS + ] + + engine_names = ["SearXNG (platform default)"] + engine_names.extend(_CONNECTOR_LABELS.get(c, c) for c in active_live_connectors) + engines_summary = ", ".join(engine_names) + + description = ( + "Search the web for real-time information. " + "Use this for current events, news, prices, weather, public facts, or any " + "question that requires up-to-date information from the internet.\n\n" + f"Active search engines: {engines_summary}.\n" + "All configured engines are queried in parallel and results are merged." + ) + + _search_space_id = search_space_id + _active_live = active_live_connectors + + async def _web_search_impl(query: str, top_k: int = 10) -> str: + from app.services import web_search_service + + perf = get_perf_logger() + t0 = time.perf_counter() + clamped_top_k = min(max(1, top_k), 50) + + semaphore = asyncio.Semaphore(4) + tasks: list[asyncio.Task[list[dict[str, Any]]]] = [] + + if web_search_service.is_available(): + + async def _searxng() -> list[dict[str, Any]]: + async with semaphore: + _result_obj, docs = await web_search_service.search( + query=query, + top_k=clamped_top_k, + ) + return docs + + tasks.append(asyncio.ensure_future(_searxng())) + + if _search_space_id is not None: + for connector in _active_live: + tasks.append( + asyncio.ensure_future( + _search_live_connector( + connector=connector, + query=query, + search_space_id=_search_space_id, + top_k=clamped_top_k, + semaphore=semaphore, + ) + ) + ) + + if not tasks: + return "Web search is not available — no search engines are configured." + + results_lists = await asyncio.gather(*tasks, return_exceptions=True) + + all_documents: list[dict[str, Any]] = [] + for result in results_lists: + if isinstance(result, BaseException): + perf.warning("[web_search] a search engine failed: %s", result) + continue + all_documents.extend(result) + + seen_urls: set[str] = set() + deduplicated: list[dict[str, Any]] = [] + for doc in all_documents: + url = ((doc.get("document") or {}).get("metadata") or {}).get("url", "") + if url and url in seen_urls: + continue + if url: + seen_urls.add(url) + deduplicated.append(doc) + + formatted = _format_web_results(deduplicated) + + perf.info( + "[web_search] query=%r engines=%d results=%d deduped=%d chars=%d in %.3fs", + query[:60], + len(tasks), + len(all_documents), + len(deduplicated), + len(formatted), + time.perf_counter() - t0, + ) + return formatted + + return StructuredTool( + name="web_search", + description=description, + coroutine=_web_search_impl, + args_schema=WebSearchInput, + )