web-search: register results on the citation registry (Channel B -> A)

web_search now registers each result as a WEB_RESULT (locator {url}) and
renders a <web_results> block of <document view="excerpt"> [n] passages,
returning Command(update={messages, citation_registry}) like
search_knowledge_base. Collapse the duplicate research-subagent web_search
into the shared tool and teach the prompts to cite web hits with [n].
This commit is contained in:
CREDO23 2026-06-25 15:26:51 +02:00
parent c98bdea5cf
commit 49d675c065
8 changed files with 218 additions and 303 deletions

View file

@ -4,7 +4,10 @@
facts, anything outside SurfSense docs and the workspace KB. Reach for
it whenever freshness matters or you'd otherwise guess from memory.
- Don't refuse with "I lack network access" — call the tool.
- Returns a `<web_results>` block: each result is labelled `[n]`. Cite a
result by writing that `[n]` after the statement it supports (when
citations are enabled) — do not hand-write the URL as a markdown link.
- If results are thin, say so and offer to refine the query.
- Args: `query`, `top_k` (default 10, max 50).
- Follow up with `scrape_webpage` on the best URL when snippets are too
shallow. Present sources with `[label](url)` markdown links.
shallow.

View file

@ -7,6 +7,9 @@ from typing import Any
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.chat.multi_agent_chat.shared.middleware.citation_state import (
build_citation_state_mw,
)
from app.agents.chat.multi_agent_chat.subagents.shared.md_file_reader import (
read_md_file,
)
@ -31,6 +34,12 @@ def build_subagent(
or "Handles research tasks for this workspace."
)
system_prompt = read_md_file(__package__, "system_prompt").strip()
# web_search registers WEB_RESULT citations via Command(update=...); the
# citation-state middleware declares the channel so those [n] merge back up.
middleware_with_citations = {
**(middleware_stack or {}),
"citation_state": build_citation_state_mw(),
}
return pack_subagent(
name=NAME,
description=description,
@ -39,5 +48,5 @@ def build_subagent(
ruleset=RULESET,
dependencies=dependencies,
model=model,
middleware_stack=middleware_stack,
middleware_stack=middleware_with_citations,
)

View file

@ -17,6 +17,16 @@ Gather and synthesize evidence using SurfSense research tools with clear citatio
- Never fabricate facts, citations, URLs, or quote text.
</tool_policy>
<citations>
`web_search` returns a `<web_results>` block whose results are each prefixed with a bracketed label — `[1]`, `[2]`, `[3]`. That `[n]` is the citation label. When a finding came from a specific result, append its `[n]` to that finding, copying the label **exactly** as shown. The caller relays these labels verbatim and the server resolves each one, so a wrong number silently breaks the citation.
- Use the exact `[n]` shown next to the result you actually used; never renumber, guess, or invent a label.
- Before emitting an `[n]`, confirm that bracketed label appears in the `web_search` output this turn. If you can't see it, omit it.
- Write the bare label `[n]` only — no `[citation:…]` wrapper, no markdown links.
- Several results behind one finding → each in its own brackets with nothing between: `[1][2]`.
- `scrape_webpage` returns raw page text with no `[n]` labels; a fact drawn only from a scrape carries no citation (report the URL in `evidence.sources` instead).
</citations>
<out_of_scope>
- Do not execute connector mutations (email/calendar/docs/chat writes) or deliverable generation.
</out_of_scope>
@ -47,6 +57,6 @@ Return **only** one JSON object (no markdown/prose):
}
<include snippet="output_contract_base"/>
Route-specific rules:
- `evidence.findings`: max 10 entries, each a single sentence stating one distinct fact. Do not paste raw paragraphs, scraped pages, or quote blocks.
- `evidence.sources`: max 10 URLs, one per finding when applicable. List each URL once.
- `evidence.findings`: max 10 entries, each a single sentence stating one distinct fact. Append the supporting `[n]` to each finding drawn from a `web_search` result. Do not paste raw paragraphs, scraped pages, or quote blocks.
- `evidence.sources`: max 10 URLs, one per finding when applicable. List each URL once. (Citations travel as `[n]`; `sources` is for transparency and for scrape-only facts that carry no `[n]`.)
</output_contract>

View file

@ -1,7 +1,8 @@
"""Research-stage tools: web search and scrape."""
"""Research-stage tools: web search (shared) and scrape."""
from app.agents.chat.shared.tools.web_search import create_web_search_tool
from .scrape_webpage import create_scrape_webpage_tool
from .web_search import create_web_search_tool
__all__ = [
"create_scrape_webpage_tool",

View file

@ -7,9 +7,9 @@ from typing import Any
from langchain_core.tools import BaseTool
from app.agents.chat.multi_agent_chat.shared.permissions import Ruleset
from app.agents.chat.shared.tools.web_search import create_web_search_tool
from .scrape_webpage import create_scrape_webpage_tool
from .web_search import create_web_search_tool
NAME = "research"

View file

@ -1,241 +0,0 @@
"""Real-time web search: SearXNG plus configured live-search connectors (Tavily, Linkup, Baidu, etc.)."""
import asyncio
import json
import time
from typing import Any
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from app.db import shielded_async_session
from app.services.connector_service import ConnectorService
from app.utils.perf import get_perf_logger
_LIVE_SEARCH_CONNECTORS: set[str] = {
"TAVILY_API",
"LINKUP_API",
"BAIDU_SEARCH_API",
}
_LIVE_CONNECTOR_SPECS: dict[str, tuple[str, bool, bool, dict[str, Any]]] = {
"TAVILY_API": ("search_tavily", False, True, {}),
"LINKUP_API": ("search_linkup", False, False, {"mode": "standard"}),
"BAIDU_SEARCH_API": ("search_baidu", False, True, {}),
}
_CONNECTOR_LABELS: dict[str, str] = {
"TAVILY_API": "Tavily",
"LINKUP_API": "Linkup",
"BAIDU_SEARCH_API": "Baidu",
}
class WebSearchInput(BaseModel):
"""Input schema for the web_search tool."""
query: str = Field(
description="The search query to look up on the web. Use specific, descriptive terms.",
)
top_k: int = Field(
default=10,
description="Number of results to retrieve (default: 10, max: 50).",
)
def _format_web_results(
documents: list[dict[str, Any]],
*,
max_chars: int = 50_000,
) -> str:
"""Format web search results into XML suitable for the LLM context."""
if not documents:
return "No web search results found."
parts: list[str] = []
total_chars = 0
for doc in documents:
doc_info = doc.get("document") or {}
metadata = doc_info.get("metadata") or {}
title = doc_info.get("title") or "Web Result"
url = metadata.get("url") or ""
content = (doc.get("content") or "").strip()
source = metadata.get("document_type") or doc.get("source") or "WEB_SEARCH"
if not content:
continue
metadata_json = json.dumps(metadata, ensure_ascii=False)
doc_xml = "\n".join(
[
"<document>",
"<document_metadata>",
f" <document_type>{source}</document_type>",
f" <title><![CDATA[{title}]]></title>",
f" <url><![CDATA[{url}]]></url>",
f" <metadata_json><![CDATA[{metadata_json}]]></metadata_json>",
"</document_metadata>",
"<document_content>",
f" <chunk id='{url}'><![CDATA[{content}]]></chunk>",
"</document_content>",
"</document>",
"",
]
)
if total_chars + len(doc_xml) > max_chars:
parts.append("<!-- Output truncated to fit context window -->")
break
parts.append(doc_xml)
total_chars += len(doc_xml)
return "\n".join(parts).strip() or "No web search results found."
async def _search_live_connector(
connector: str,
query: str,
search_space_id: int,
top_k: int,
semaphore: asyncio.Semaphore,
) -> list[dict[str, Any]]:
"""Dispatch a single live-search connector (Tavily / Linkup / Baidu)."""
perf = get_perf_logger()
spec = _LIVE_CONNECTOR_SPECS.get(connector)
if spec is None:
return []
method_name, _includes_date_range, includes_top_k, extra_kwargs = spec
kwargs: dict[str, Any] = {
"user_query": query,
"search_space_id": search_space_id,
**extra_kwargs,
}
if includes_top_k:
kwargs["top_k"] = top_k
try:
t0 = time.perf_counter()
async with semaphore, shielded_async_session() as session:
svc = ConnectorService(session, search_space_id)
_, chunks = await getattr(svc, method_name)(**kwargs)
perf.info(
"[web_search] connector=%s results=%d in %.3fs",
connector,
len(chunks),
time.perf_counter() - t0,
)
return chunks
except Exception as e:
perf.warning("[web_search] connector=%s FAILED: %s", connector, e)
return []
def create_web_search_tool(
search_space_id: int | None = None,
available_connectors: list[str] | None = None,
) -> StructuredTool:
"""Factory for the ``web_search`` tool.
Dispatches in parallel to the platform SearXNG instance and any
user-configured live-search connectors (Tavily, Linkup, Baidu).
"""
active_live_connectors: list[str] = []
if available_connectors:
active_live_connectors = [
c for c in available_connectors if c in _LIVE_SEARCH_CONNECTORS
]
engine_names = ["SearXNG (platform default)"]
engine_names.extend(_CONNECTOR_LABELS.get(c, c) for c in active_live_connectors)
engines_summary = ", ".join(engine_names)
description = (
"Search the web for real-time information. "
"Use this for current events, news, prices, weather, public facts, or any "
"question that requires up-to-date information from the internet.\n\n"
f"Active search engines: {engines_summary}.\n"
"All configured engines are queried in parallel and results are merged."
)
_search_space_id = search_space_id
_active_live = active_live_connectors
async def _web_search_impl(query: str, top_k: int = 10) -> str:
from app.services import web_search_service
perf = get_perf_logger()
t0 = time.perf_counter()
clamped_top_k = min(max(1, top_k), 50)
semaphore = asyncio.Semaphore(4)
tasks: list[asyncio.Task[list[dict[str, Any]]]] = []
if web_search_service.is_available():
async def _searxng() -> list[dict[str, Any]]:
async with semaphore:
_result_obj, docs = await web_search_service.search(
query=query,
top_k=clamped_top_k,
)
return docs
tasks.append(asyncio.ensure_future(_searxng()))
if _search_space_id is not None:
for connector in _active_live:
tasks.append(
asyncio.ensure_future(
_search_live_connector(
connector=connector,
query=query,
search_space_id=_search_space_id,
top_k=clamped_top_k,
semaphore=semaphore,
)
)
)
if not tasks:
return "Web search is not available — no search engines are configured."
results_lists = await asyncio.gather(*tasks, return_exceptions=True)
all_documents: list[dict[str, Any]] = []
for result in results_lists:
if isinstance(result, BaseException):
perf.warning("[web_search] a search engine failed: %s", result)
continue
all_documents.extend(result)
seen_urls: set[str] = set()
deduplicated: list[dict[str, Any]] = []
for doc in all_documents:
url = ((doc.get("document") or {}).get("metadata") or {}).get("url", "")
if url and url in seen_urls:
continue
if url:
seen_urls.add(url)
deduplicated.append(doc)
formatted = _format_web_results(deduplicated)
perf.info(
"[web_search] query=%r engines=%d results=%d deduped=%d chars=%d in %.3fs",
query[:60],
len(tasks),
len(all_documents),
len(deduplicated),
len(formatted),
time.perf_counter() - t0,
)
return formatted
return StructuredTool(
name="web_search",
description=description,
coroutine=_web_search_impl,
args_schema=WebSearchInput,
)

View file

@ -4,20 +4,40 @@ Web search tool for the SurfSense agent.
Provides a unified tool for real-time web searches that dispatches to all
configured search engines: the platform SearXNG instance (always available)
plus any user-configured live-search connectors (Tavily, Linkup, Baidu).
Each result is registered into the conversation citation registry as a
``WEB_RESULT`` and rendered with a server-assigned ``[n]`` label, so the model
cites the web exactly like the knowledge base one ``[n]`` spine, no special
web citation form.
"""
import asyncio
import json
import time
from typing import Any
from __future__ import annotations
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
import asyncio
import time
from typing import TYPE_CHECKING, Annotated, Any
from urllib.parse import urlparse
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.db import shielded_async_session
from app.services.connector_service import ConnectorService
from app.utils.perf import get_perf_logger
if TYPE_CHECKING:
from app.agents.chat.multi_agent_chat.shared.document_render import (
RenderableDocument,
)
# NOTE: imports from ``app.agents.chat.multi_agent_chat`` are done lazily inside
# the functions below. This module lives under ``app.agents.chat.shared`` but is
# imported during the ``multi_agent_chat`` package's own init cascade (via the
# research subagent); importing that package at module load would re-enter a
# partially-initialized module. Lazy imports break that cycle.
_LIVE_SEARCH_CONNECTORS: set[str] = {
"TAVILY_API",
"LINKUP_API",
@ -37,28 +57,29 @@ _CONNECTOR_LABELS: dict[str, str] = {
}
class WebSearchInput(BaseModel):
"""Input schema for the web_search tool."""
query: str = Field(
description="The search query to look up on the web. Use specific, descriptive terms.",
)
top_k: int = Field(
default=10,
description="Number of results to retrieve (default: 10, max: 50).",
)
def _web_source_label(url: str) -> str:
"""A compact, human-readable source for the ``<document source=…>`` attr."""
domain = urlparse(url).netloc.removeprefix("www.") if url else ""
return f"Web · {domain}" if domain else "Web"
def _format_web_results(
def _to_renderable_web_documents(
documents: list[dict[str, Any]],
*,
max_chars: int = 50_000,
) -> str:
"""Format web search results into XML suitable for the LLM context."""
if not documents:
return "No web search results found."
) -> list[RenderableDocument]:
"""Map raw web results to renderable documents, one passage (the snippet) each.
parts: list[str] = []
A result with no URL is skipped: ``url`` is the citation locator, so without
it the result cannot be registered or resolved.
"""
from app.agents.chat.multi_agent_chat.shared.citations import CitationSourceType
from app.agents.chat.multi_agent_chat.shared.document_render import (
RenderableDocument,
RenderablePassage,
)
renderables: list[RenderableDocument] = []
total_chars = 0
for doc in documents:
@ -67,36 +88,28 @@ def _format_web_results(
title = doc_info.get("title") or "Web Result"
url = metadata.get("url") or ""
content = (doc.get("content") or "").strip()
source = metadata.get("document_type") or doc.get("source") or "WEB_SEARCH"
if not content:
if not content or not url:
continue
metadata_json = json.dumps(metadata, ensure_ascii=False)
doc_xml = "\n".join(
[
"<document>",
"<document_metadata>",
f" <document_type>{source}</document_type>",
f" <title><![CDATA[{title}]]></title>",
f" <url><![CDATA[{url}]]></url>",
f" <metadata_json><![CDATA[{metadata_json}]]></metadata_json>",
"</document_metadata>",
"<document_content>",
f" <chunk id='{url}'><![CDATA[{content}]]></chunk>",
"</document_content>",
"</document>",
"",
]
)
if total_chars + len(doc_xml) > max_chars:
parts.append("<!-- Output truncated to fit context window -->")
total_chars += len(content)
if total_chars > max_chars:
break
parts.append(doc_xml)
total_chars += len(doc_xml)
renderables.append(
RenderableDocument(
title=title,
source=_web_source_label(url),
passages=[
RenderablePassage(
content=content,
locator={"url": url},
source_type=CitationSourceType.WEB_RESULT,
)
],
)
)
return "\n".join(parts).strip() or "No web search results found."
return renderables
async def _search_live_connector(
@ -141,7 +154,7 @@ async def _search_live_connector(
def create_web_search_tool(
search_space_id: int | None = None,
available_connectors: list[str] | None = None,
) -> StructuredTool:
) -> BaseTool:
"""Factory for the ``web_search`` tool.
Dispatches in parallel to the platform SearXNG instance and any
@ -168,7 +181,17 @@ def create_web_search_tool(
_search_space_id = search_space_id
_active_live = active_live_connectors
async def _web_search_impl(query: str, top_k: int = 10) -> str:
async def _web_search_impl(
query: Annotated[
str,
"The search query to look up on the web. Use specific, descriptive terms.",
],
runtime: ToolRuntime,
top_k: Annotated[
int,
"Number of results to retrieve (default: 10, max: 50).",
] = 10,
) -> Command | str:
from app.services import web_search_service
perf = get_perf_logger()
@ -226,22 +249,39 @@ def create_web_search_tool(
seen_urls.add(url)
deduplicated.append(doc)
formatted = _format_web_results(deduplicated)
from app.agents.chat.multi_agent_chat.shared.citations import load_registry
from app.agents.chat.multi_agent_chat.shared.document_render import (
render_web_results,
)
registry = load_registry(getattr(runtime, "state", None))
renderables = _to_renderable_web_documents(deduplicated)
rendered = render_web_results(renderables, registry)
perf.info(
"[web_search] query=%r engines=%d results=%d deduped=%d chars=%d in %.3fs",
"[web_search] query=%r engines=%d results=%d deduped=%d renderable=%d in %.3fs",
query[:60],
len(tasks),
len(all_documents),
len(deduplicated),
len(formatted),
len(renderables),
time.perf_counter() - t0,
)
return formatted
return StructuredTool(
if rendered is None:
return "No web search results found."
return Command(
update={
"messages": [
ToolMessage(content=rendered, tool_call_id=runtime.tool_call_id)
],
"citation_registry": registry,
}
)
return StructuredTool.from_function(
name="web_search",
description=description,
coroutine=_web_search_impl,
args_schema=WebSearchInput,
)

View file

@ -0,0 +1,93 @@
"""Tests for the shared ``web_search`` tool's citable-result adaptation.
The tool's network path (SearXNG + live connectors) is out of scope here; these
cover the pure mapping from raw web results to renderable, citable documents and
the end-to-end registration of ``WEB_RESULT`` ``[n]`` labels.
"""
from __future__ import annotations
import pytest
from app.agents.chat.multi_agent_chat.shared.citations import (
CitationRegistry,
CitationSourceType,
)
from app.agents.chat.multi_agent_chat.shared.document_render import render_web_results
from app.agents.chat.shared.tools.web_search import (
_to_renderable_web_documents,
_web_source_label,
)
pytestmark = pytest.mark.unit
def _raw_result(url: str, title: str, content: str) -> dict:
return {
"document": {"title": title, "metadata": {"url": url}},
"content": content,
}
def test_web_source_label_strips_scheme_and_www() -> None:
assert _web_source_label("https://www.example.com/path") == "Web · example.com"
assert _web_source_label("http://news.site.org/a/b") == "Web · news.site.org"
assert _web_source_label("") == "Web"
def test_adapter_maps_each_result_to_one_web_passage() -> None:
docs = _to_renderable_web_documents(
[
_raw_result("https://a.com/x", "Alpha", "alpha body"),
_raw_result("https://b.com/y", "Beta", "beta body"),
]
)
assert [d.title for d in docs] == ["Alpha", "Beta"]
passages = [p for d in docs for p in d.passages]
assert all(p.source_type is CitationSourceType.WEB_RESULT for p in passages)
assert passages[0].locator == {"url": "https://a.com/x"}
assert passages[0].content == "alpha body"
def test_adapter_skips_results_without_url_or_content() -> None:
docs = _to_renderable_web_documents(
[
_raw_result("", "No URL", "has content"),
_raw_result("https://c.com/z", "Empty", " "),
_raw_result("https://d.com/w", "Good", "real content"),
]
)
assert [d.title for d in docs] == ["Good"]
def test_adapter_truncates_on_char_budget() -> None:
big = "x" * 30
docs = _to_renderable_web_documents(
[
_raw_result("https://a.com", "A", big),
_raw_result("https://b.com", "B", big),
_raw_result("https://c.com", "C", big),
],
max_chars=50,
)
# First fits (30), second crosses 50 and stops the loop.
assert [d.title for d in docs] == ["A"]
def test_end_to_end_registers_web_results_for_citation() -> None:
registry = CitationRegistry()
docs = _to_renderable_web_documents(
[_raw_result("https://example.com/a", "Example", "the answer is 42")]
)
block = render_web_results(docs, registry)
assert block is not None
assert "[1] the answer is 42" in block
entry = registry.resolve(1)
assert entry is not None
assert entry.source_type is CitationSourceType.WEB_RESULT
assert entry.locator == {"url": "https://example.com/a"}