From b22fe012d5c6646524eb789ae4a2930612771ba5 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 15 Mar 2026 04:45:39 +0530 Subject: [PATCH] feat: enhance `search_web` functionality and clarify connector usage - modify the `web_search` tool that integrates multiple live search connectors (Tavily, Linkup, Baidu) alongside the SearXNG platform for real-time information retrieval. - Updated comments to reflect the separation of local knowledge base searches from web searches. - Improved handling of connector types and search logic to ensure accurate routing of queries to the appropriate tools. - Refactored existing code to streamline the search process and enhance performance. --- .../app/agents/new_chat/chat_deepagent.py | 7 +- .../app/agents/new_chat/system_prompt.py | 47 +++-- .../agents/new_chat/tools/knowledge_base.py | 110 +++-------- .../app/agents/new_chat/tools/registry.py | 9 +- .../app/agents/new_chat/tools/web_search.py | 178 +++++++++++++++--- 5 files changed, 222 insertions(+), 129 deletions(-) diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index 0e0281a8c..c247ada61 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -37,12 +37,15 @@ _perf_log = get_perf_logger() # ============================================================================= # Maps SearchSourceConnectorType enum values to the searchable document/connector types -# used by the knowledge_base tool. Some connectors map to different document types. +# used by the knowledge_base and web_search tools. +# Live search connectors (TAVILY_API, LINKUP_API, BAIDU_SEARCH_API) are routed to +# the web_search tool; all others go to search_knowledge_base. _CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = { - # Direct mappings (connector type == searchable type) + # Live search connectors (handled by web_search tool) "TAVILY_API": "TAVILY_API", "LINKUP_API": "LINKUP_API", "BAIDU_SEARCH_API": "BAIDU_SEARCH_API", + # Local/indexed connectors (handled by search_knowledge_base tool) "SLACK_CONNECTOR": "SLACK_CONNECTOR", "TEAMS_CONNECTOR": "TEAMS_CONNECTOR", "NOTION_CONNECTOR": "NOTION_CONNECTOR", diff --git a/surfsense_backend/app/agents/new_chat/system_prompt.py b/surfsense_backend/app/agents/new_chat/system_prompt.py index 30fe62dd8..274896229 100644 --- a/surfsense_backend/app/agents/new_chat/system_prompt.py +++ b/surfsense_backend/app/agents/new_chat/system_prompt.py @@ -99,15 +99,8 @@ _TOOL_INSTRUCTIONS["search_knowledge_base"] = """ - IMPORTANT: When searching for information (meetings, schedules, notes, tasks, etc.), ALWAYS search broadly across ALL sources first by omitting connectors_to_search. The user may store information in various places including calendar apps, note-taking apps (Obsidian, Notion), chat apps (Slack, Discord), and more. - - IMPORTANT (REAL-TIME / PUBLIC WEB QUERIES): For questions that require current public web data - (e.g., live exchange rates, stock prices, breaking news, weather, current events), you MUST call - `search_knowledge_base` using live web connectors via `connectors_to_search`. - Use whichever of these live connectors are available: ["LINKUP_API", "TAVILY_API", "SEARXNG_API", "BAIDU_SEARCH_API"]. - Only connectors listed in the tool's available connector enums section will actually return results. - - For these real-time/public web queries, DO NOT answer from memory and DO NOT say you lack internet - access before attempting a live connector search. - - If the live connectors return no relevant results, explain that live web sources did not return enough - data and ask the user if they want you to retry with a refined query. + - This tool searches ONLY local/indexed data (uploaded files, Notion, Slack, browser extension captures, etc.). + For real-time web search (current events, news, live data), use the `web_search` tool instead. - FALLBACK BEHAVIOR: If the search returns no relevant results, you MAY then answer using your own general knowledge, but clearly indicate that no matching information was found in the knowledge base. - Only narrow to specific connectors if the user explicitly asks (e.g., "check my Slack" or "in my calendar"). @@ -272,6 +265,24 @@ _TOOL_INSTRUCTIONS["scrape_webpage"] = """ * Don't show every image - just the most relevant 1-3 images that enhance understanding. """ +_TOOL_INSTRUCTIONS["web_search"] = """ +- web_search: Search the web for real-time information using all configured search engines. + - Use this for current events, news, prices, weather, public facts, or any question requiring + up-to-date information from the internet. + - This tool dispatches to all configured search engines (SearXNG, Tavily, Linkup, Baidu) in + parallel and merges the results. + - IMPORTANT (REAL-TIME / PUBLIC WEB QUERIES): For questions that require current public web data + (e.g., live exchange rates, stock prices, breaking news, weather, current events), you MUST call + `web_search` instead of answering from memory. + - For these real-time/public web queries, DO NOT answer from memory and DO NOT say you lack internet + access before attempting a web search. + - If the search returns no relevant results, explain that web sources did not return enough + data and ask the user if they want you to retry with a refined query. + - Args: + - query: The search query - use specific, descriptive terms + - top_k: Number of results to retrieve (default: 10, max: 50) +""" + # Memory tool instructions have private and shared variants. # We store them keyed as "save_memory" / "recall_memory" with sub-keys. _MEMORY_TOOL_INSTRUCTIONS: dict[str, dict[str, str]] = { @@ -402,7 +413,7 @@ _TOOL_EXAMPLES["search_knowledge_base"] = """ - User: "Check my Obsidian notes for meeting notes" - Call: `search_knowledge_base(query="meeting notes", connectors_to_search=["OBSIDIAN_CONNECTOR"])` - User: "search me current usd to inr rate" - - Call: `search_knowledge_base(query="current USD to INR exchange rate", connectors_to_search=["LINKUP_API", "TAVILY_API", "SEARXNG_API", "BAIDU_SEARCH_API"])` + - Call: `web_search(query="current USD to INR exchange rate")` - Then answer using the returned live web results with citations. """ @@ -472,10 +483,21 @@ _TOOL_EXAMPLES["generate_image"] = """ - Step 2: `display_image(src="", alt="Bean Dream coffee shop logo", title="Generated Image")` """ +_TOOL_EXAMPLES["web_search"] = """ +- User: "What's the current USD to INR exchange rate?" + - Call: `web_search(query="current USD to INR exchange rate")` + - Then answer using the returned web results with citations. +- User: "What's the latest news about AI?" + - Call: `web_search(query="latest AI news today")` +- User: "What's the weather in New York?" + - Call: `web_search(query="weather New York today")` +""" + # All tool names that have prompt instructions (order matters for prompt readability) _ALL_TOOL_NAMES_ORDERED = [ "search_surfsense_docs", "search_knowledge_base", + "web_search", "generate_podcast", "generate_report", "link_preview", @@ -596,11 +618,10 @@ The documents you receive are structured like this: -**Live web search results (URL chunk IDs):** +**Web search results (URL chunk IDs):** - TAVILY_API::Some Title::https://example.com/article - TAVILY_API + WEB_SEARCH <![CDATA[Some web search result]]> diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index 56a4defff..45fdddb9d 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -23,8 +23,8 @@ from app.db import shielded_async_session from app.services.connector_service import ConnectorService from app.utils.perf import get_perf_logger -# Connectors that call external live-search APIs (no local DB / embedding needed). -# These are never filtered by available_document_types. +# Connectors that call external live-search APIs. These are handled by the +# ``web_search`` tool and must be excluded from knowledge-base searches. _LIVE_SEARCH_CONNECTORS: set[str] = { "TAVILY_API", "LINKUP_API", @@ -189,9 +189,6 @@ _ALL_CONNECTORS: list[str] = [ "GOOGLE_DRIVE_FILE", "DISCORD_CONNECTOR", "AIRTABLE_CONNECTOR", - "TAVILY_API", - "LINKUP_API", - "BAIDU_SEARCH_API", "LUMA_CONNECTOR", "NOTE", "BOOKSTACK_CONNECTOR", @@ -225,9 +222,6 @@ CONNECTOR_DESCRIPTIONS: dict[str, str] = { "GOOGLE_DRIVE_FILE": "Google Drive files and documents (personal cloud storage)", "DISCORD_CONNECTOR": "Discord server conversations and shared content (personal community)", "AIRTABLE_CONNECTOR": "Airtable records, tables, and database content (personal data)", - "TAVILY_API": "Tavily web search API results (real-time web search)", - "LINKUP_API": "Linkup search API results (web search)", - "BAIDU_SEARCH_API": "Baidu search API results (Chinese web search)", "LUMA_CONNECTOR": "Luma events and meetings", "WEBCRAWLER_CONNECTOR": "Webpages indexed by SurfSense (personally selected websites)", "CRAWLED_URL": "Webpages indexed by SurfSense (personally selected websites)", @@ -265,14 +259,11 @@ def _normalize_connectors( valid_set = ( set(available_connectors) if available_connectors else set(_ALL_CONNECTORS) ) + valid_set -= _LIVE_SEARCH_CONNECTORS if not connectors_to_search: - # Search all available connectors if none specified - return ( - list(available_connectors) - if available_connectors - else list(_ALL_CONNECTORS) - ) + base = list(available_connectors) if available_connectors else list(_ALL_CONNECTORS) + return [c for c in base if c not in _LIVE_SEARCH_CONNECTORS] normalized: list[str] = [] for raw in connectors_to_search: @@ -299,15 +290,10 @@ def _normalize_connectors( out.append(c) # Fallback to all available if nothing matched - return ( - out - if out - else ( - list(available_connectors) - if available_connectors - else list(_ALL_CONNECTORS) - ) - ) + if not out: + base = list(available_connectors) if available_connectors else list(_ALL_CONNECTORS) + return [c for c in base if c not in _LIVE_SEARCH_CONNECTORS] + return out # ============================================================================= @@ -619,13 +605,11 @@ async def search_knowledge_base_async( connectors = _normalize_connectors(connectors_to_search, available_connectors) - # --- Optimization 1: skip local connectors that have zero indexed documents --- + # --- Optimization 1: skip connectors that have zero indexed documents --- if available_document_types: doc_types_set = set(available_document_types) before_count = len(connectors) - connectors = [ - c for c in connectors if c in _LIVE_SEARCH_CONNECTORS or c in doc_types_set - ] + connectors = [c for c in connectors if c in doc_types_set] skipped = before_count - len(connectors) if skipped: perf.info( @@ -660,9 +644,7 @@ async def search_knowledge_base_async( "[kb_search] degenerate query %r detected - falling back to recency browse", query, ) - local_connectors = [c for c in connectors if c not in _LIVE_SEARCH_CONNECTORS] - if not local_connectors: - local_connectors = [None] # type: ignore[list-item] + browse_connectors = connectors if connectors else [None] # type: ignore[list-item] browse_results = await asyncio.gather( *[ @@ -673,7 +655,7 @@ async def search_knowledge_base_async( start_date=resolved_start_date, end_date=resolved_end_date, ) - for c in local_connectors + for c in browse_connectors ] ) for docs in browse_results: @@ -698,65 +680,20 @@ async def search_knowledge_base_async( ) return result - # Specs for live-search connectors (external APIs, no local DB/embedding). - live_connector_specs: dict[str, tuple[str, bool, bool, dict[str, Any]]] = { - "TAVILY_API": ("search_tavily", False, True, {}), - "LINKUP_API": ("search_linkup", False, False, {"mode": "standard"}), - "BAIDU_SEARCH_API": ("search_baidu", False, True, {}), - } - # --- Optimization 2: compute the query embedding once, share across all local searches --- - precomputed_embedding: list[float] | None = None - has_local_connectors = any(c not in _LIVE_SEARCH_CONNECTORS for c in connectors) - if has_local_connectors: - from app.config import config as app_config + from app.config import config as app_config - t_embed = time.perf_counter() - precomputed_embedding = app_config.embedding_model_instance.embed(query) - perf.info( - "[kb_search] shared embedding computed in %.3fs", - time.perf_counter() - t_embed, - ) + t_embed = time.perf_counter() + precomputed_embedding = app_config.embedding_model_instance.embed(query) + perf.info( + "[kb_search] shared embedding computed in %.3fs", + time.perf_counter() - t_embed, + ) max_parallel_searches = 4 semaphore = asyncio.Semaphore(max_parallel_searches) async def _search_one_connector(connector: str) -> list[dict[str, Any]]: - is_live = connector in _LIVE_SEARCH_CONNECTORS - - if is_live: - spec = live_connector_specs.get(connector) - if spec is None: - return [] - method_name, includes_date_range, includes_top_k, extra_kwargs = spec - kwargs: dict[str, Any] = { - "user_query": query, - "search_space_id": search_space_id, - **extra_kwargs, - } - if includes_top_k: - kwargs["top_k"] = top_k - if includes_date_range: - kwargs["start_date"] = resolved_start_date - kwargs["end_date"] = resolved_end_date - - try: - t_conn = time.perf_counter() - async with semaphore, shielded_async_session() as isolated_session: - svc = ConnectorService(isolated_session, search_space_id) - _, chunks = await getattr(svc, method_name)(**kwargs) - perf.info( - "[kb_search] connector=%s results=%d in %.3fs", - connector, - len(chunks), - time.perf_counter() - t_conn, - ) - return chunks - except Exception as e: - perf.warning("[kb_search] connector=%s FAILED: %s", connector, e) - return [] - - # --- Optimization 3: call _combined_rrf_search directly with shared embedding --- try: t_conn = time.perf_counter() async with semaphore, shielded_async_session() as isolated_session: @@ -962,7 +899,9 @@ Focus searches on these types for best results.""" # This is what the LLM sees when deciding whether/how to use the tool dynamic_description = f"""Search the user's personal knowledge base for relevant information. -Use this tool to find documents, notes, files, web pages, and other content that may help answer the user's question. +Use this tool to find documents, notes, files, web pages, and other content the user has indexed. +This searches ONLY local/indexed data (uploaded files, Notion, Slack, browser extension captures, etc.). +For real-time web search (current events, news, live data), use the `web_search` tool instead. IMPORTANT: - Always craft specific, descriptive search queries using natural language keywords. @@ -972,9 +911,6 @@ IMPORTANT: - If the user requests a specific source type (e.g. "my notes", "Slack messages"), pass `connectors_to_search=[...]` using the enums below. - If `connectors_to_search` is omitted/empty, the system will search broadly. - Only connectors that are enabled/configured for this search space are available.{doc_types_info} -- For real-time/public web queries (e.g., current exchange rates, stock prices, breaking news, weather), - explicitly include live web connectors in `connectors_to_search`, prioritizing: - ["LINKUP_API", "TAVILY_API", "BAIDU_SEARCH_API"]. ## Available connector enums for `connectors_to_search` diff --git a/surfsense_backend/app/agents/new_chat/tools/registry.py b/surfsense_backend/app/agents/new_chat/tools/registry.py index ab0d72194..541dcc34d 100644 --- a/surfsense_backend/app/agents/new_chat/tools/registry.py +++ b/surfsense_backend/app/agents/new_chat/tools/registry.py @@ -187,11 +187,14 @@ BUILTIN_TOOLS: list[ToolDefinition] = [ ), requires=[], # firecrawl_api_key is optional ), - # Web search tool — real-time web search via platform SearXNG + # Web search tool — real-time web search via SearXNG + user-configured engines ToolDefinition( name="web_search", - description="Search the web for real-time information using the platform SearXNG instance", - factory=lambda deps: create_web_search_tool(), + description="Search the web for real-time information using configured search engines", + factory=lambda deps: create_web_search_tool( + search_space_id=deps.get("search_space_id"), + available_connectors=deps.get("available_connectors"), + ), requires=[], ), # Surfsense documentation search tool diff --git a/surfsense_backend/app/agents/new_chat/tools/web_search.py b/surfsense_backend/app/agents/new_chat/tools/web_search.py index 057ee8d07..987fb9c80 100644 --- a/surfsense_backend/app/agents/new_chat/tools/web_search.py +++ b/surfsense_backend/app/agents/new_chat/tools/web_search.py @@ -1,10 +1,12 @@ """ -Web search tool backed by the platform SearXNG instance. +Web search tool for the SurfSense agent. -Provides a standalone tool for real-time web searches, separate from the -knowledge base search which handles local/indexed documents. +Provides a unified tool for real-time web searches that dispatches to all +configured search engines: the platform SearXNG instance (always available) +plus any user-configured live-search connectors (Tavily, Linkup, Baidu). """ +import asyncio import json import time from typing import Any @@ -12,8 +14,28 @@ from typing import Any from langchain_core.tools import StructuredTool from pydantic import BaseModel, Field +from app.db import shielded_async_session +from app.services.connector_service import ConnectorService from app.utils.perf import get_perf_logger +_LIVE_SEARCH_CONNECTORS: set[str] = { + "TAVILY_API", + "LINKUP_API", + "BAIDU_SEARCH_API", +} + +_LIVE_CONNECTOR_SPECS: dict[str, tuple[str, bool, bool, dict[str, Any]]] = { + "TAVILY_API": ("search_tavily", False, True, {}), + "LINKUP_API": ("search_linkup", False, False, {"mode": "standard"}), + "BAIDU_SEARCH_API": ("search_baidu", False, True, {}), +} + +_CONNECTOR_LABELS: dict[str, str] = { + "TAVILY_API": "Tavily", + "LINKUP_API": "Linkup", + "BAIDU_SEARCH_API": "Baidu", +} + class WebSearchInput(BaseModel): """Input schema for the web_search tool.""" @@ -27,8 +49,12 @@ class WebSearchInput(BaseModel): ) -def _format_web_results(documents: list[dict[str, Any]], *, max_chars: int = 50_000) -> str: - """Format SearXNG results into XML suitable for the LLM context.""" +def _format_web_results( + documents: list[dict[str, Any]], + *, + max_chars: int = 50_000, +) -> str: + """Format web search results into XML suitable for the LLM context.""" if not documents: return "No web search results found." @@ -41,6 +67,7 @@ def _format_web_results(documents: list[dict[str, Any]], *, max_chars: int = 50_ title = doc_info.get("title") or "Web Result" url = metadata.get("url") or "" content = (doc.get("content") or "").strip() + source = metadata.get("document_type") or doc.get("source") or "WEB_SEARCH" if not content: continue @@ -48,7 +75,7 @@ def _format_web_results(documents: list[dict[str, Any]], *, max_chars: int = 50_ doc_xml = "\n".join([ "", "", - f" SEARXNG_API", + f" {source}", f" <![CDATA[{title}]]>", f" ", f" ", @@ -70,33 +97,141 @@ def _format_web_results(documents: list[dict[str, Any]], *, max_chars: int = 50_ return "\n".join(parts).strip() or "No web search results found." -def create_web_search_tool() -> StructuredTool: +async def _search_live_connector( + connector: str, + query: str, + search_space_id: int, + top_k: int, + semaphore: asyncio.Semaphore, +) -> list[dict[str, Any]]: + """Dispatch a single live-search connector (Tavily / Linkup / Baidu).""" + perf = get_perf_logger() + spec = _LIVE_CONNECTOR_SPECS.get(connector) + if spec is None: + return [] + + method_name, _includes_date_range, includes_top_k, extra_kwargs = spec + kwargs: dict[str, Any] = { + "user_query": query, + "search_space_id": search_space_id, + **extra_kwargs, + } + if includes_top_k: + kwargs["top_k"] = top_k + + try: + t0 = time.perf_counter() + async with semaphore, shielded_async_session() as session: + svc = ConnectorService(session, search_space_id) + _, chunks = await getattr(svc, method_name)(**kwargs) + perf.info( + "[web_search] connector=%s results=%d in %.3fs", + connector, + len(chunks), + time.perf_counter() - t0, + ) + return chunks + except Exception as e: + perf.warning("[web_search] connector=%s FAILED: %s", connector, e) + return [] + + +def create_web_search_tool( + search_space_id: int | None = None, + available_connectors: list[str] | None = None, +) -> StructuredTool: """Factory for the ``web_search`` tool. - The tool calls the platform SearXNG service (via ``web_search_service``) - which handles caching, circuit breaking, and retries internally. + Dispatches in parallel to the platform SearXNG instance and any + user-configured live-search connectors (Tavily, Linkup, Baidu). """ + active_live_connectors: list[str] = [] + if available_connectors: + active_live_connectors = [ + c for c in available_connectors if c in _LIVE_SEARCH_CONNECTORS + ] + + engine_names = ["SearXNG (platform default)"] + engine_names.extend( + _CONNECTOR_LABELS.get(c, c) for c in active_live_connectors + ) + engines_summary = ", ".join(engine_names) + + description = ( + "Search the web for real-time information. " + "Use this for current events, news, prices, weather, public facts, or any " + "question that requires up-to-date information from the internet.\n\n" + f"Active search engines: {engines_summary}.\n" + "All configured engines are queried in parallel and results are merged." + ) + + _search_space_id = search_space_id + _active_live = active_live_connectors async def _web_search_impl(query: str, top_k: int = 10) -> str: from app.services import web_search_service perf = get_perf_logger() t0 = time.perf_counter() + clamped_top_k = min(max(1, top_k), 50) - if not web_search_service.is_available(): - return "Web search is not available — SearXNG is not configured on this server." + semaphore = asyncio.Semaphore(4) + tasks: list[asyncio.Task[list[dict[str, Any]]]] = [] - _result_obj, documents = await web_search_service.search( - query=query, - top_k=min(max(1, top_k), 50), - ) + if web_search_service.is_available(): + async def _searxng() -> list[dict[str, Any]]: + async with semaphore: + _result_obj, docs = await web_search_service.search( + query=query, top_k=clamped_top_k, + ) + return docs - formatted = _format_web_results(documents) + tasks.append(asyncio.ensure_future(_searxng())) + + if _search_space_id is not None: + for connector in _active_live: + tasks.append( + asyncio.ensure_future( + _search_live_connector( + connector=connector, + query=query, + search_space_id=_search_space_id, + top_k=clamped_top_k, + semaphore=semaphore, + ) + ) + ) + + if not tasks: + return "Web search is not available — no search engines are configured." + + results_lists = await asyncio.gather(*tasks, return_exceptions=True) + + all_documents: list[dict[str, Any]] = [] + for result in results_lists: + if isinstance(result, BaseException): + perf.warning("[web_search] a search engine failed: %s", result) + continue + all_documents.extend(result) + + seen_urls: set[str] = set() + deduplicated: list[dict[str, Any]] = [] + for doc in all_documents: + url = ((doc.get("document") or {}).get("metadata") or {}).get("url", "") + if url and url in seen_urls: + continue + if url: + seen_urls.add(url) + deduplicated.append(doc) + + formatted = _format_web_results(deduplicated) perf.info( - "[web_search] query=%r results=%d chars=%d in %.3fs", + "[web_search] query=%r engines=%d results=%d deduped=%d chars=%d in %.3fs", query[:60], - len(documents), + len(tasks), + len(all_documents), + len(deduplicated), len(formatted), time.perf_counter() - t0, ) @@ -104,12 +239,7 @@ def create_web_search_tool() -> StructuredTool: return StructuredTool( name="web_search", - description=( - "Search the web for real-time information using the platform SearXNG instance. " - "Use this for current events, news, prices, weather, public facts, or any " - "question that requires up-to-date information from the internet. " - "Results are privacy-focused — all queries are proxied through the server." - ), + description=description, coroutine=_web_search_impl, args_schema=WebSearchInput, )