feat: enhance search_web functionality and clarify connector usage

- modify the `web_search` tool that integrates multiple live search connectors (Tavily, Linkup, Baidu) alongside the SearXNG platform for real-time information retrieval.
- Updated comments to reflect the separation of local knowledge base searches from web searches.
- Improved handling of connector types and search logic to ensure accurate routing of queries to the appropriate tools.
- Refactored existing code to streamline the search process and enhance performance.
This commit is contained in:
Anish Sarkar 2026-03-15 04:45:39 +05:30
parent fe9e30eda2
commit b22fe012d5
5 changed files with 222 additions and 129 deletions

View file

@ -37,12 +37,15 @@ _perf_log = get_perf_logger()
# =============================================================================
# Maps SearchSourceConnectorType enum values to the searchable document/connector types
# used by the knowledge_base tool. Some connectors map to different document types.
# used by the knowledge_base and web_search tools.
# Live search connectors (TAVILY_API, LINKUP_API, BAIDU_SEARCH_API) are routed to
# the web_search tool; all others go to search_knowledge_base.
_CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = {
# Direct mappings (connector type == searchable type)
# Live search connectors (handled by web_search tool)
"TAVILY_API": "TAVILY_API",
"LINKUP_API": "LINKUP_API",
"BAIDU_SEARCH_API": "BAIDU_SEARCH_API",
# Local/indexed connectors (handled by search_knowledge_base tool)
"SLACK_CONNECTOR": "SLACK_CONNECTOR",
"TEAMS_CONNECTOR": "TEAMS_CONNECTOR",
"NOTION_CONNECTOR": "NOTION_CONNECTOR",

View file

@ -99,15 +99,8 @@ _TOOL_INSTRUCTIONS["search_knowledge_base"] = """
- IMPORTANT: When searching for information (meetings, schedules, notes, tasks, etc.), ALWAYS search broadly
across ALL sources first by omitting connectors_to_search. The user may store information in various places
including calendar apps, note-taking apps (Obsidian, Notion), chat apps (Slack, Discord), and more.
- IMPORTANT (REAL-TIME / PUBLIC WEB QUERIES): For questions that require current public web data
(e.g., live exchange rates, stock prices, breaking news, weather, current events), you MUST call
`search_knowledge_base` using live web connectors via `connectors_to_search`.
Use whichever of these live connectors are available: ["LINKUP_API", "TAVILY_API", "SEARXNG_API", "BAIDU_SEARCH_API"].
Only connectors listed in the tool's available connector enums section will actually return results.
- For these real-time/public web queries, DO NOT answer from memory and DO NOT say you lack internet
access before attempting a live connector search.
- If the live connectors return no relevant results, explain that live web sources did not return enough
data and ask the user if they want you to retry with a refined query.
- This tool searches ONLY local/indexed data (uploaded files, Notion, Slack, browser extension captures, etc.).
For real-time web search (current events, news, live data), use the `web_search` tool instead.
- FALLBACK BEHAVIOR: If the search returns no relevant results, you MAY then answer using your own
general knowledge, but clearly indicate that no matching information was found in the knowledge base.
- Only narrow to specific connectors if the user explicitly asks (e.g., "check my Slack" or "in my calendar").
@ -272,6 +265,24 @@ _TOOL_INSTRUCTIONS["scrape_webpage"] = """
* Don't show every image - just the most relevant 1-3 images that enhance understanding.
"""
_TOOL_INSTRUCTIONS["web_search"] = """
- web_search: Search the web for real-time information using all configured search engines.
- Use this for current events, news, prices, weather, public facts, or any question requiring
up-to-date information from the internet.
- This tool dispatches to all configured search engines (SearXNG, Tavily, Linkup, Baidu) in
parallel and merges the results.
- IMPORTANT (REAL-TIME / PUBLIC WEB QUERIES): For questions that require current public web data
(e.g., live exchange rates, stock prices, breaking news, weather, current events), you MUST call
`web_search` instead of answering from memory.
- For these real-time/public web queries, DO NOT answer from memory and DO NOT say you lack internet
access before attempting a web search.
- If the search returns no relevant results, explain that web sources did not return enough
data and ask the user if they want you to retry with a refined query.
- Args:
- query: The search query - use specific, descriptive terms
- top_k: Number of results to retrieve (default: 10, max: 50)
"""
# Memory tool instructions have private and shared variants.
# We store them keyed as "save_memory" / "recall_memory" with sub-keys.
_MEMORY_TOOL_INSTRUCTIONS: dict[str, dict[str, str]] = {
@ -402,7 +413,7 @@ _TOOL_EXAMPLES["search_knowledge_base"] = """
- User: "Check my Obsidian notes for meeting notes"
- Call: `search_knowledge_base(query="meeting notes", connectors_to_search=["OBSIDIAN_CONNECTOR"])`
- User: "search me current usd to inr rate"
- Call: `search_knowledge_base(query="current USD to INR exchange rate", connectors_to_search=["LINKUP_API", "TAVILY_API", "SEARXNG_API", "BAIDU_SEARCH_API"])`
- Call: `web_search(query="current USD to INR exchange rate")`
- Then answer using the returned live web results with citations.
"""
@ -472,10 +483,21 @@ _TOOL_EXAMPLES["generate_image"] = """
- Step 2: `display_image(src="<returned_url>", alt="Bean Dream coffee shop logo", title="Generated Image")`
"""
_TOOL_EXAMPLES["web_search"] = """
- User: "What's the current USD to INR exchange rate?"
- Call: `web_search(query="current USD to INR exchange rate")`
- Then answer using the returned web results with citations.
- User: "What's the latest news about AI?"
- Call: `web_search(query="latest AI news today")`
- User: "What's the weather in New York?"
- Call: `web_search(query="weather New York today")`
"""
# All tool names that have prompt instructions (order matters for prompt readability)
_ALL_TOOL_NAMES_ORDERED = [
"search_surfsense_docs",
"search_knowledge_base",
"web_search",
"generate_podcast",
"generate_report",
"link_preview",
@ -596,11 +618,10 @@ The documents you receive are structured like this:
</document_content>
</document>
**Live web search results (URL chunk IDs):**
**Web search results (URL chunk IDs):**
<document>
<document_metadata>
<document_id>TAVILY_API::Some Title::https://example.com/article</document_id>
<document_type>TAVILY_API</document_type>
<document_type>WEB_SEARCH</document_type>
<title><![CDATA[Some web search result]]></title>
<url><![CDATA[https://example.com/article]]></url>
</document_metadata>

View file

@ -23,8 +23,8 @@ from app.db import shielded_async_session
from app.services.connector_service import ConnectorService
from app.utils.perf import get_perf_logger
# Connectors that call external live-search APIs (no local DB / embedding needed).
# These are never filtered by available_document_types.
# Connectors that call external live-search APIs. These are handled by the
# ``web_search`` tool and must be excluded from knowledge-base searches.
_LIVE_SEARCH_CONNECTORS: set[str] = {
"TAVILY_API",
"LINKUP_API",
@ -189,9 +189,6 @@ _ALL_CONNECTORS: list[str] = [
"GOOGLE_DRIVE_FILE",
"DISCORD_CONNECTOR",
"AIRTABLE_CONNECTOR",
"TAVILY_API",
"LINKUP_API",
"BAIDU_SEARCH_API",
"LUMA_CONNECTOR",
"NOTE",
"BOOKSTACK_CONNECTOR",
@ -225,9 +222,6 @@ CONNECTOR_DESCRIPTIONS: dict[str, str] = {
"GOOGLE_DRIVE_FILE": "Google Drive files and documents (personal cloud storage)",
"DISCORD_CONNECTOR": "Discord server conversations and shared content (personal community)",
"AIRTABLE_CONNECTOR": "Airtable records, tables, and database content (personal data)",
"TAVILY_API": "Tavily web search API results (real-time web search)",
"LINKUP_API": "Linkup search API results (web search)",
"BAIDU_SEARCH_API": "Baidu search API results (Chinese web search)",
"LUMA_CONNECTOR": "Luma events and meetings",
"WEBCRAWLER_CONNECTOR": "Webpages indexed by SurfSense (personally selected websites)",
"CRAWLED_URL": "Webpages indexed by SurfSense (personally selected websites)",
@ -265,14 +259,11 @@ def _normalize_connectors(
valid_set = (
set(available_connectors) if available_connectors else set(_ALL_CONNECTORS)
)
valid_set -= _LIVE_SEARCH_CONNECTORS
if not connectors_to_search:
# Search all available connectors if none specified
return (
list(available_connectors)
if available_connectors
else list(_ALL_CONNECTORS)
)
base = list(available_connectors) if available_connectors else list(_ALL_CONNECTORS)
return [c for c in base if c not in _LIVE_SEARCH_CONNECTORS]
normalized: list[str] = []
for raw in connectors_to_search:
@ -299,15 +290,10 @@ def _normalize_connectors(
out.append(c)
# Fallback to all available if nothing matched
return (
out
if out
else (
list(available_connectors)
if available_connectors
else list(_ALL_CONNECTORS)
)
)
if not out:
base = list(available_connectors) if available_connectors else list(_ALL_CONNECTORS)
return [c for c in base if c not in _LIVE_SEARCH_CONNECTORS]
return out
# =============================================================================
@ -619,13 +605,11 @@ async def search_knowledge_base_async(
connectors = _normalize_connectors(connectors_to_search, available_connectors)
# --- Optimization 1: skip local connectors that have zero indexed documents ---
# --- Optimization 1: skip connectors that have zero indexed documents ---
if available_document_types:
doc_types_set = set(available_document_types)
before_count = len(connectors)
connectors = [
c for c in connectors if c in _LIVE_SEARCH_CONNECTORS or c in doc_types_set
]
connectors = [c for c in connectors if c in doc_types_set]
skipped = before_count - len(connectors)
if skipped:
perf.info(
@ -660,9 +644,7 @@ async def search_knowledge_base_async(
"[kb_search] degenerate query %r detected - falling back to recency browse",
query,
)
local_connectors = [c for c in connectors if c not in _LIVE_SEARCH_CONNECTORS]
if not local_connectors:
local_connectors = [None] # type: ignore[list-item]
browse_connectors = connectors if connectors else [None] # type: ignore[list-item]
browse_results = await asyncio.gather(
*[
@ -673,7 +655,7 @@ async def search_knowledge_base_async(
start_date=resolved_start_date,
end_date=resolved_end_date,
)
for c in local_connectors
for c in browse_connectors
]
)
for docs in browse_results:
@ -698,65 +680,20 @@ async def search_knowledge_base_async(
)
return result
# Specs for live-search connectors (external APIs, no local DB/embedding).
live_connector_specs: dict[str, tuple[str, bool, bool, dict[str, Any]]] = {
"TAVILY_API": ("search_tavily", False, True, {}),
"LINKUP_API": ("search_linkup", False, False, {"mode": "standard"}),
"BAIDU_SEARCH_API": ("search_baidu", False, True, {}),
}
# --- Optimization 2: compute the query embedding once, share across all local searches ---
precomputed_embedding: list[float] | None = None
has_local_connectors = any(c not in _LIVE_SEARCH_CONNECTORS for c in connectors)
if has_local_connectors:
from app.config import config as app_config
from app.config import config as app_config
t_embed = time.perf_counter()
precomputed_embedding = app_config.embedding_model_instance.embed(query)
perf.info(
"[kb_search] shared embedding computed in %.3fs",
time.perf_counter() - t_embed,
)
t_embed = time.perf_counter()
precomputed_embedding = app_config.embedding_model_instance.embed(query)
perf.info(
"[kb_search] shared embedding computed in %.3fs",
time.perf_counter() - t_embed,
)
max_parallel_searches = 4
semaphore = asyncio.Semaphore(max_parallel_searches)
async def _search_one_connector(connector: str) -> list[dict[str, Any]]:
is_live = connector in _LIVE_SEARCH_CONNECTORS
if is_live:
spec = live_connector_specs.get(connector)
if spec is None:
return []
method_name, includes_date_range, includes_top_k, extra_kwargs = spec
kwargs: dict[str, Any] = {
"user_query": query,
"search_space_id": search_space_id,
**extra_kwargs,
}
if includes_top_k:
kwargs["top_k"] = top_k
if includes_date_range:
kwargs["start_date"] = resolved_start_date
kwargs["end_date"] = resolved_end_date
try:
t_conn = time.perf_counter()
async with semaphore, shielded_async_session() as isolated_session:
svc = ConnectorService(isolated_session, search_space_id)
_, chunks = await getattr(svc, method_name)(**kwargs)
perf.info(
"[kb_search] connector=%s results=%d in %.3fs",
connector,
len(chunks),
time.perf_counter() - t_conn,
)
return chunks
except Exception as e:
perf.warning("[kb_search] connector=%s FAILED: %s", connector, e)
return []
# --- Optimization 3: call _combined_rrf_search directly with shared embedding ---
try:
t_conn = time.perf_counter()
async with semaphore, shielded_async_session() as isolated_session:
@ -962,7 +899,9 @@ Focus searches on these types for best results."""
# This is what the LLM sees when deciding whether/how to use the tool
dynamic_description = f"""Search the user's personal knowledge base for relevant information.
Use this tool to find documents, notes, files, web pages, and other content that may help answer the user's question.
Use this tool to find documents, notes, files, web pages, and other content the user has indexed.
This searches ONLY local/indexed data (uploaded files, Notion, Slack, browser extension captures, etc.).
For real-time web search (current events, news, live data), use the `web_search` tool instead.
IMPORTANT:
- Always craft specific, descriptive search queries using natural language keywords.
@ -972,9 +911,6 @@ IMPORTANT:
- If the user requests a specific source type (e.g. "my notes", "Slack messages"), pass `connectors_to_search=[...]` using the enums below.
- If `connectors_to_search` is omitted/empty, the system will search broadly.
- Only connectors that are enabled/configured for this search space are available.{doc_types_info}
- For real-time/public web queries (e.g., current exchange rates, stock prices, breaking news, weather),
explicitly include live web connectors in `connectors_to_search`, prioritizing:
["LINKUP_API", "TAVILY_API", "BAIDU_SEARCH_API"].
## Available connector enums for `connectors_to_search`

View file

@ -187,11 +187,14 @@ BUILTIN_TOOLS: list[ToolDefinition] = [
),
requires=[], # firecrawl_api_key is optional
),
# Web search tool — real-time web search via platform SearXNG
# Web search tool — real-time web search via SearXNG + user-configured engines
ToolDefinition(
name="web_search",
description="Search the web for real-time information using the platform SearXNG instance",
factory=lambda deps: create_web_search_tool(),
description="Search the web for real-time information using configured search engines",
factory=lambda deps: create_web_search_tool(
search_space_id=deps.get("search_space_id"),
available_connectors=deps.get("available_connectors"),
),
requires=[],
),
# Surfsense documentation search tool

View file

@ -1,10 +1,12 @@
"""
Web search tool backed by the platform SearXNG instance.
Web search tool for the SurfSense agent.
Provides a standalone tool for real-time web searches, separate from the
knowledge base search which handles local/indexed documents.
Provides a unified tool for real-time web searches that dispatches to all
configured search engines: the platform SearXNG instance (always available)
plus any user-configured live-search connectors (Tavily, Linkup, Baidu).
"""
import asyncio
import json
import time
from typing import Any
@ -12,8 +14,28 @@ from typing import Any
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from app.db import shielded_async_session
from app.services.connector_service import ConnectorService
from app.utils.perf import get_perf_logger
_LIVE_SEARCH_CONNECTORS: set[str] = {
"TAVILY_API",
"LINKUP_API",
"BAIDU_SEARCH_API",
}
_LIVE_CONNECTOR_SPECS: dict[str, tuple[str, bool, bool, dict[str, Any]]] = {
"TAVILY_API": ("search_tavily", False, True, {}),
"LINKUP_API": ("search_linkup", False, False, {"mode": "standard"}),
"BAIDU_SEARCH_API": ("search_baidu", False, True, {}),
}
_CONNECTOR_LABELS: dict[str, str] = {
"TAVILY_API": "Tavily",
"LINKUP_API": "Linkup",
"BAIDU_SEARCH_API": "Baidu",
}
class WebSearchInput(BaseModel):
"""Input schema for the web_search tool."""
@ -27,8 +49,12 @@ class WebSearchInput(BaseModel):
)
def _format_web_results(documents: list[dict[str, Any]], *, max_chars: int = 50_000) -> str:
"""Format SearXNG results into XML suitable for the LLM context."""
def _format_web_results(
documents: list[dict[str, Any]],
*,
max_chars: int = 50_000,
) -> str:
"""Format web search results into XML suitable for the LLM context."""
if not documents:
return "No web search results found."
@ -41,6 +67,7 @@ def _format_web_results(documents: list[dict[str, Any]], *, max_chars: int = 50_
title = doc_info.get("title") or "Web Result"
url = metadata.get("url") or ""
content = (doc.get("content") or "").strip()
source = metadata.get("document_type") or doc.get("source") or "WEB_SEARCH"
if not content:
continue
@ -48,7 +75,7 @@ def _format_web_results(documents: list[dict[str, Any]], *, max_chars: int = 50_
doc_xml = "\n".join([
"<document>",
"<document_metadata>",
f" <document_type>SEARXNG_API</document_type>",
f" <document_type>{source}</document_type>",
f" <title><![CDATA[{title}]]></title>",
f" <url><![CDATA[{url}]]></url>",
f" <metadata_json><![CDATA[{metadata_json}]]></metadata_json>",
@ -70,33 +97,141 @@ def _format_web_results(documents: list[dict[str, Any]], *, max_chars: int = 50_
return "\n".join(parts).strip() or "No web search results found."
def create_web_search_tool() -> StructuredTool:
async def _search_live_connector(
connector: str,
query: str,
search_space_id: int,
top_k: int,
semaphore: asyncio.Semaphore,
) -> list[dict[str, Any]]:
"""Dispatch a single live-search connector (Tavily / Linkup / Baidu)."""
perf = get_perf_logger()
spec = _LIVE_CONNECTOR_SPECS.get(connector)
if spec is None:
return []
method_name, _includes_date_range, includes_top_k, extra_kwargs = spec
kwargs: dict[str, Any] = {
"user_query": query,
"search_space_id": search_space_id,
**extra_kwargs,
}
if includes_top_k:
kwargs["top_k"] = top_k
try:
t0 = time.perf_counter()
async with semaphore, shielded_async_session() as session:
svc = ConnectorService(session, search_space_id)
_, chunks = await getattr(svc, method_name)(**kwargs)
perf.info(
"[web_search] connector=%s results=%d in %.3fs",
connector,
len(chunks),
time.perf_counter() - t0,
)
return chunks
except Exception as e:
perf.warning("[web_search] connector=%s FAILED: %s", connector, e)
return []
def create_web_search_tool(
search_space_id: int | None = None,
available_connectors: list[str] | None = None,
) -> StructuredTool:
"""Factory for the ``web_search`` tool.
The tool calls the platform SearXNG service (via ``web_search_service``)
which handles caching, circuit breaking, and retries internally.
Dispatches in parallel to the platform SearXNG instance and any
user-configured live-search connectors (Tavily, Linkup, Baidu).
"""
active_live_connectors: list[str] = []
if available_connectors:
active_live_connectors = [
c for c in available_connectors if c in _LIVE_SEARCH_CONNECTORS
]
engine_names = ["SearXNG (platform default)"]
engine_names.extend(
_CONNECTOR_LABELS.get(c, c) for c in active_live_connectors
)
engines_summary = ", ".join(engine_names)
description = (
"Search the web for real-time information. "
"Use this for current events, news, prices, weather, public facts, or any "
"question that requires up-to-date information from the internet.\n\n"
f"Active search engines: {engines_summary}.\n"
"All configured engines are queried in parallel and results are merged."
)
_search_space_id = search_space_id
_active_live = active_live_connectors
async def _web_search_impl(query: str, top_k: int = 10) -> str:
from app.services import web_search_service
perf = get_perf_logger()
t0 = time.perf_counter()
clamped_top_k = min(max(1, top_k), 50)
if not web_search_service.is_available():
return "Web search is not available — SearXNG is not configured on this server."
semaphore = asyncio.Semaphore(4)
tasks: list[asyncio.Task[list[dict[str, Any]]]] = []
_result_obj, documents = await web_search_service.search(
query=query,
top_k=min(max(1, top_k), 50),
)
if web_search_service.is_available():
async def _searxng() -> list[dict[str, Any]]:
async with semaphore:
_result_obj, docs = await web_search_service.search(
query=query, top_k=clamped_top_k,
)
return docs
formatted = _format_web_results(documents)
tasks.append(asyncio.ensure_future(_searxng()))
if _search_space_id is not None:
for connector in _active_live:
tasks.append(
asyncio.ensure_future(
_search_live_connector(
connector=connector,
query=query,
search_space_id=_search_space_id,
top_k=clamped_top_k,
semaphore=semaphore,
)
)
)
if not tasks:
return "Web search is not available — no search engines are configured."
results_lists = await asyncio.gather(*tasks, return_exceptions=True)
all_documents: list[dict[str, Any]] = []
for result in results_lists:
if isinstance(result, BaseException):
perf.warning("[web_search] a search engine failed: %s", result)
continue
all_documents.extend(result)
seen_urls: set[str] = set()
deduplicated: list[dict[str, Any]] = []
for doc in all_documents:
url = ((doc.get("document") or {}).get("metadata") or {}).get("url", "")
if url and url in seen_urls:
continue
if url:
seen_urls.add(url)
deduplicated.append(doc)
formatted = _format_web_results(deduplicated)
perf.info(
"[web_search] query=%r results=%d chars=%d in %.3fs",
"[web_search] query=%r engines=%d results=%d deduped=%d chars=%d in %.3fs",
query[:60],
len(documents),
len(tasks),
len(all_documents),
len(deduplicated),
len(formatted),
time.perf_counter() - t0,
)
@ -104,12 +239,7 @@ def create_web_search_tool() -> StructuredTool:
return StructuredTool(
name="web_search",
description=(
"Search the web for real-time information using the platform SearXNG instance. "
"Use this for current events, news, prices, weather, public facts, or any "
"question that requires up-to-date information from the internet. "
"Results are privacy-focused — all queries are proxied through the server."
),
description=description,
coroutine=_web_search_impl,
args_schema=WebSearchInput,
)