multi_agent_chat/subagents: expose knowledge_base as ask_knowledge_base tool for siblings

This commit is contained in:
CREDO23 2026-05-12 20:03:59 +02:00
parent f2f62c1c05
commit 379cc992f4
12 changed files with 339 additions and 77 deletions

View file

@ -16,6 +16,7 @@ def build_filesystem_mw(
search_space_id: int,
user_id: str | None,
thread_id: int | None,
read_only: bool = False,
) -> SurfSenseFilesystemMiddleware:
return SurfSenseFilesystemMiddleware(
backend=backend_resolver,
@ -23,4 +24,5 @@ def build_filesystem_mw(
search_space_id=search_space_id,
created_by_id=user_id,
thread_id=thread_id,
read_only=read_only,
)

View file

@ -28,6 +28,7 @@ from ..tools import (
)
from ..tools.glob.description import select_description as glob_description
from ..tools.grep.description import select_description as grep_description
from .read_only_policy import READ_ONLY_TOOL_NAMES
class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
@ -44,12 +45,16 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
created_by_id: str | None = None,
thread_id: int | str | None = None,
tool_token_limit_before_evict: int | None = 20000,
read_only: bool = False,
) -> None:
self._filesystem_mode = filesystem_mode
self._search_space_id = search_space_id
self._created_by_id = created_by_id
self._thread_id = thread_id
self._sandbox_available = is_sandbox_enabled() and thread_id is not None
self._read_only = read_only
self._sandbox_available = (
is_sandbox_enabled() and thread_id is not None and not read_only
)
system_prompt = build_system_prompt(
filesystem_mode,
@ -72,6 +77,9 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
if self._sandbox_available:
self.tools.append(create_execute_code_tool(self))
if read_only:
self.tools = [t for t in self.tools if t.name in READ_ONLY_TOOL_NAMES]
# ----------------------------------------- base-class tool overrides
def _create_ls_tool(self) -> BaseTool:

View file

@ -0,0 +1,7 @@
"""Allowlist consulted by ``SurfSenseFilesystemMiddleware`` when ``read_only=True``."""
from __future__ import annotations
READ_ONLY_TOOL_NAMES = frozenset(
{"ls", "read_file", "glob", "grep", "list_tree", "pwd", "cd"}
)

View file

@ -15,6 +15,7 @@ from typing import Any
from deepagents import SubAgent
from deepagents.backends import StateBackend
from langchain.agents import create_agent
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from langgraph.types import Checkpointer
@ -23,6 +24,13 @@ from app.agents.multi_agent_chat.subagents import (
build_subagents,
get_subagents_to_exclude,
)
from app.agents.multi_agent_chat.subagents.builtins.knowledge_base.agent import (
READONLY_NAME as KB_READONLY_NAME,
build_readonly_subagent as build_kb_readonly_subagent,
)
from app.agents.multi_agent_chat.subagents.builtins.knowledge_base.ask_knowledge_base_tool import (
build_ask_knowledge_base_tool,
)
from app.agents.multi_agent_chat.subagents.shared.permissions import ToolsPermissions
from app.agents.new_chat.feature_flags import AgentFeatureFlags
from app.agents.new_chat.filesystem_selection import FilesystemMode
@ -93,14 +101,31 @@ def build_main_agent_deepagent_middleware(
"backend_resolver": backend_resolver,
"filesystem_mode": filesystem_mode,
}
shared_subagent_middleware = build_subagent_middleware_stack(resilience=resilience)
kb_readonly_spec = build_kb_readonly_subagent(
dependencies=subagent_dependencies,
model=llm,
middleware_stack=shared_subagent_middleware,
)
kb_readonly_runnable = create_agent(
llm,
system_prompt=kb_readonly_spec["system_prompt"],
tools=kb_readonly_spec["tools"],
middleware=kb_readonly_spec["middleware"],
name=KB_READONLY_NAME,
checkpointer=checkpointer,
)
ask_kb_tool = build_ask_knowledge_base_tool(kb_readonly_runnable)
subagents: list[SubAgent] = build_subagents(
dependencies=subagent_dependencies,
model=llm,
middleware_stack=build_subagent_middleware_stack(resilience=resilience),
middleware_stack=shared_subagent_middleware,
mcp_tools_by_agent=mcp_tools_by_agent or {},
exclude=get_subagents_to_exclude(available_connectors),
disabled_tools=disabled_tools,
ask_kb_tool=ask_kb_tool,
)
logging.debug("Subagents registry: %s", [s["name"] for s in subagents])

View file

@ -1,13 +1,4 @@
"""`knowledge_base` route: ``SubAgent`` spec for the SurfSense KB specialist.
Owns the ``/documents/`` workspace (read, write, edit, search, organise)
and shares the orchestrator's ``workspace_tree_text`` and ``kb_priority``
via state. KB conforms to :class:`SubagentBuilder` but composes its
middleware list itself: it picks individual entries from
``middleware_stack`` by key so resilience lands just outside the
Anthropic cache (inside the filesystem and projection middlewares),
which a flat prepend can't satisfy.
"""
"""`knowledge_base` route: full and read-only ``SubAgent`` specs."""
from __future__ import annotations
@ -16,32 +7,15 @@ from typing import Any, cast
from deepagents import SubAgent
from langchain_core.language_models import BaseChatModel
from app.agents.multi_agent_chat.middleware.shared.anthropic_cache import (
build_anthropic_cache_mw,
)
from app.agents.multi_agent_chat.middleware.shared.compaction import (
build_compaction_mw,
)
from app.agents.multi_agent_chat.middleware.shared.filesystem import (
build_filesystem_mw,
)
from app.agents.multi_agent_chat.middleware.shared.kb_context_projection import (
build_kb_context_projection_mw,
)
from app.agents.multi_agent_chat.middleware.shared.patch_tool_calls import (
build_patch_tool_calls_mw,
)
from app.agents.multi_agent_chat.subagents.shared.md_file_reader import (
read_md_file,
)
from app.agents.multi_agent_chat.subagents.shared.permissions import (
ToolsPermissions,
)
from app.agents.multi_agent_chat.subagents.shared.permissions import ToolsPermissions
from app.agents.new_chat.filesystem_selection import FilesystemMode
from .middleware_stack import build_kb_middleware
from .prompts import load_description, load_readonly_system_prompt, load_system_prompt
from .tools.index import destructive_fs_interrupt_on
NAME = "knowledge_base"
READONLY_NAME = "knowledge_base_readonly"
def build_subagent(
@ -51,55 +25,45 @@ def build_subagent(
middleware_stack: dict[str, Any] | None = None,
extra_tools_bucket: ToolsPermissions | None = None, # noqa: ARG001 — KB ships fixed tools
) -> SubAgent:
"""Conforms to :class:`SubagentBuilder`; KB splices the shared stack itself."""
llm = model if model is not None else dependencies["llm"]
filesystem_mode: FilesystemMode = dependencies["filesystem_mode"]
mws = middleware_stack or {}
description = read_md_file(__package__, "description").strip() or (
"Handles knowledge-base reads, writes, edits, and organisation."
)
prompt_stem = (
"system_prompt_cloud"
if filesystem_mode == FilesystemMode.CLOUD
else "system_prompt_desktop"
)
system_prompt = read_md_file(__package__, prompt_stem).strip()
resilience_mws = [
m
for m in (
mws.get("retry"),
mws.get("fallback"),
mws.get("model_call_limit"),
mws.get("tool_call_limit"),
)
if m is not None
]
middleware: list[Any] = [
mws["todos"],
build_kb_context_projection_mw(),
build_filesystem_mw(
backend_resolver=dependencies["backend_resolver"],
filesystem_mode=filesystem_mode,
search_space_id=dependencies["search_space_id"],
user_id=dependencies.get("user_id"),
thread_id=dependencies.get("thread_id"),
),
build_compaction_mw(llm),
build_patch_tool_calls_mw(),
*resilience_mws,
build_anthropic_cache_mw(),
]
spec: dict[str, Any] = {
"name": NAME,
"description": description,
"system_prompt": system_prompt,
"description": load_description(),
"system_prompt": load_system_prompt(filesystem_mode),
"model": llm,
"tools": [], # KB virtual FS tools are injected at runtime by SurfSenseFilesystemMiddleware
"middleware": middleware,
"tools": [],
"middleware": build_kb_middleware(
llm=llm,
dependencies=dependencies,
middleware_stack=middleware_stack,
read_only=False,
),
"interrupt_on": destructive_fs_interrupt_on(),
}
return cast(SubAgent, spec)
def build_readonly_subagent(
*,
dependencies: dict[str, Any],
model: BaseChatModel | None = None,
middleware_stack: dict[str, Any] | None = None,
) -> SubAgent:
llm = model if model is not None else dependencies["llm"]
filesystem_mode: FilesystemMode = dependencies["filesystem_mode"]
spec: dict[str, Any] = {
"name": READONLY_NAME,
"description": "Read-only knowledge_base specialist (invoked via ask_knowledge_base).",
"system_prompt": load_readonly_system_prompt(filesystem_mode),
"model": llm,
"tools": [],
"middleware": build_kb_middleware(
llm=llm,
dependencies=dependencies,
middleware_stack=middleware_stack,
read_only=True,
),
"interrupt_on": {},
}
return cast(SubAgent, spec)

View file

@ -0,0 +1,80 @@
"""Wrap the read-only knowledge_base runnable as the ``ask_knowledge_base`` tool."""
from __future__ import annotations
from typing import Annotated
from langchain.tools import BaseTool, ToolRuntime
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_core.runnables import Runnable
from langchain_core.tools import StructuredTool
from langgraph.types import Command
from app.agents.multi_agent_chat.middleware.main_agent.checkpointed_subagent_middleware.config import (
subagent_invoke_config,
)
from app.agents.multi_agent_chat.middleware.main_agent.checkpointed_subagent_middleware.constants import (
EXCLUDED_STATE_KEYS,
)
from .prompts import load_readonly_description
TOOL_NAME = "ask_knowledge_base"
def _forward_state(runtime: ToolRuntime, query: str) -> dict:
forwarded = {k: v for k, v in runtime.state.items() if k not in EXCLUDED_STATE_KEYS}
forwarded["messages"] = [HumanMessage(content=query)]
return forwarded
def _wrap_result(result: dict, tool_call_id: str) -> Command:
messages = result.get("messages") or []
if not messages:
raise ValueError(
"knowledge_base_readonly returned an empty 'messages' list; "
"expected at least one assistant message."
)
last_text = (getattr(messages[-1], "text", None) or "").rstrip()
return Command(
update={"messages": [ToolMessage(last_text, tool_call_id=tool_call_id)]}
)
def build_ask_knowledge_base_tool(kb_readonly_runnable: Runnable) -> BaseTool:
def ask_knowledge_base(
query: Annotated[
str,
"Full question for the workspace specialist. Include all path hints, "
"filters, and constraints the specialist needs to answer.",
],
runtime: ToolRuntime,
) -> str | Command:
if not runtime.tool_call_id:
raise ValueError("Tool call ID is required for ask_knowledge_base")
sub_state = _forward_state(runtime, query)
sub_config = subagent_invoke_config(runtime)
result = kb_readonly_runnable.invoke(sub_state, config=sub_config)
return _wrap_result(result, runtime.tool_call_id)
async def aask_knowledge_base(
query: Annotated[
str,
"Full question for the workspace specialist. Include all path hints, "
"filters, and constraints the specialist needs to answer.",
],
runtime: ToolRuntime,
) -> str | Command:
if not runtime.tool_call_id:
raise ValueError("Tool call ID is required for ask_knowledge_base")
sub_state = _forward_state(runtime, query)
sub_config = subagent_invoke_config(runtime)
result = await kb_readonly_runnable.ainvoke(sub_state, config=sub_config)
return _wrap_result(result, runtime.tool_call_id)
return StructuredTool.from_function(
name=TOOL_NAME,
func=ask_knowledge_base,
coroutine=aask_knowledge_base,
description=load_readonly_description(),
)

View file

@ -0,0 +1,5 @@
Read-only specialist for the user's workspace (documents and folders). Use to find, read, search, or quote a document or folder when your task needs workspace context — instead of asking the user or guessing.
Pass your full question as one string. The specialist runs in isolation: it cannot see this thread, so include any path hints, filters, or constraints it needs.
The specialist returns plain prose with absolute paths.

View file

@ -0,0 +1,61 @@
"""Middleware list shared by the full and read-only knowledge_base compiles."""
from __future__ import annotations
from typing import Any
from langchain_core.language_models import BaseChatModel
from app.agents.multi_agent_chat.middleware.shared.anthropic_cache import (
build_anthropic_cache_mw,
)
from app.agents.multi_agent_chat.middleware.shared.compaction import (
build_compaction_mw,
)
from app.agents.multi_agent_chat.middleware.shared.filesystem import (
build_filesystem_mw,
)
from app.agents.multi_agent_chat.middleware.shared.kb_context_projection import (
build_kb_context_projection_mw,
)
from app.agents.multi_agent_chat.middleware.shared.patch_tool_calls import (
build_patch_tool_calls_mw,
)
from app.agents.new_chat.filesystem_selection import FilesystemMode
def build_kb_middleware(
*,
llm: BaseChatModel,
dependencies: dict[str, Any],
middleware_stack: dict[str, Any] | None,
read_only: bool,
) -> list[Any]:
mws = middleware_stack or {}
filesystem_mode: FilesystemMode = dependencies["filesystem_mode"]
resilience_mws = [
m
for m in (
mws.get("retry"),
mws.get("fallback"),
mws.get("model_call_limit"),
mws.get("tool_call_limit"),
)
if m is not None
]
return [
mws["todos"],
build_kb_context_projection_mw(),
build_filesystem_mw(
backend_resolver=dependencies["backend_resolver"],
filesystem_mode=filesystem_mode,
search_space_id=dependencies["search_space_id"],
user_id=dependencies.get("user_id"),
thread_id=dependencies.get("thread_id"),
read_only=read_only,
),
build_compaction_mw(llm),
build_patch_tool_calls_mw(),
*resilience_mws,
build_anthropic_cache_mw(),
]

View file

@ -0,0 +1,34 @@
"""Prompt loaders for the knowledge_base subagent."""
from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.md_file_reader import read_md_file
from app.agents.new_chat.filesystem_selection import FilesystemMode
def load_system_prompt(filesystem_mode: FilesystemMode) -> str:
stem = (
"system_prompt_cloud"
if filesystem_mode == FilesystemMode.CLOUD
else "system_prompt_desktop"
)
return read_md_file(__package__, stem).strip()
def load_readonly_system_prompt(filesystem_mode: FilesystemMode) -> str:
stem = (
"system_prompt_readonly_cloud"
if filesystem_mode == FilesystemMode.CLOUD
else "system_prompt_readonly_desktop"
)
return read_md_file(__package__, stem).strip()
def load_description() -> str:
return read_md_file(__package__, "description").strip() or (
"Handles knowledge-base reads, writes, edits, and organisation."
)
def load_readonly_description() -> str:
return read_md_file(__package__, "description_readonly").strip()

View file

@ -0,0 +1,29 @@
You are the **read-only** SurfSense Knowledge Base specialist for `/documents/`.
You answer workspace questions for another agent. The end user does **not** see your reply directly — be terse, cite paths, no greetings or apologies.
## Resolving paths
The caller's question often references documents by description (`"my meeting notes from last week"`, `"the design doc"`). Resolve them yourself:
1. Consult `<priority_documents>` — a hint about top-K likely matches, not a directive. Skip when the ranked entries don't fit.
2. Walk `<workspace_tree>` for descriptive folder/filename matches.
3. Use `glob` for filename patterns the tree didn't surface, and `grep` when the description points at *content* rather than a name.
If a precise path was already given, use it directly — skip the lookup.
## Interpreting tool results
- **Success** — file content (for `read_file`) or a listing (for `ls` / `glob` / `grep` / `list_tree`).
- **Failure** — text starting with `"Error: "` followed by a cause (e.g. `"Error: File '/documents/x.md' not found"`). Relay the cause to the caller verbatim.
Never report values you did not actually see.
## Return contract
Reply in plain prose:
- One short paragraph or a bullet list, whichever fits.
- Cite every claim with an absolute path under `/documents/`.
- If the workspace does not contain the requested information, say so explicitly. Do not fabricate paths or content.
- If the question is genuinely ambiguous after a thorough lookup, list the candidates with their paths and stop.

View file

@ -0,0 +1,30 @@
You are the **read-only** SurfSense workspace specialist for the user's local folders.
You answer workspace questions for another agent. The end user does **not** see your reply directly — be terse, cite paths, no greetings or apologies.
## Resolving paths
The caller's question often references files by description (`"my meeting notes from last week"`, `"the design doc"`). Resolve them yourself:
1. If you do not know which mounts exist, call `ls('/')` first.
2. Walk likely folders with the `ls` and `list_tree` tools.
3. Use `glob` for filename patterns; use `grep` when the description points at *content* rather than a name.
4. `<priority_documents>` lists top-K cloud-ingested docs, not local files — consult it only when the task spans both worlds (e.g. drafting a local note from a Notion source). Skip otherwise.
If a precise path was already given, use it directly — skip the lookup.
## Interpreting tool results
- **Success** — file content (for `read_file`) or a listing (for `ls` / `glob` / `grep` / `list_tree`).
- **Failure** — text starting with `"Error: "` followed by a cause (e.g. `"Error: File '/notes/x.md' not found"`). Relay the cause to the caller verbatim.
Never report values you did not actually see.
## Return contract
Reply in plain prose:
- One short paragraph or a bullet list, whichever fits.
- Cite every claim with an absolute path.
- If the workspace does not contain the requested information, say so explicitly. Do not fabricate paths or content.
- If the question is genuinely ambiguous after a thorough lookup, list the candidates with their paths and stop.

View file

@ -6,6 +6,7 @@ from typing import Any, Protocol
from deepagents import SubAgent
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.constants import (
SUBAGENT_TO_REQUIRED_CONNECTOR_MAP,
@ -168,6 +169,19 @@ def _filter_disabled_tools_in_place(
}
def _inject_ask_kb_tool_in_place(spec: SubAgent, ask_kb_tool: BaseTool) -> None:
"""Append ``ask_knowledge_base`` to every non-KB spec (skips a self-call)."""
if spec["name"] == "knowledge_base":
return
tools = spec.get("tools") # type: ignore[typeddict-item]
if not isinstance(tools, list):
spec["tools"] = [ask_kb_tool] # type: ignore[typeddict-unknown-key]
return
if any(getattr(t, "name", None) == ask_kb_tool.name for t in tools):
return
tools.append(ask_kb_tool)
def build_subagents(
*,
dependencies: dict[str, Any],
@ -176,6 +190,7 @@ def build_subagents(
mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None,
exclude: list[str] | None = None,
disabled_tools: list[str] | None = None,
ask_kb_tool: BaseTool | None = None,
) -> list[SubAgent]:
"""Build registry subagents; skip memory/research; skip names in exclude."""
mcp = mcp_tools_by_agent or {}
@ -195,5 +210,7 @@ def build_subagents(
extra_tools_bucket=mcp.get(name),
)
_filter_disabled_tools_in_place(spec, disabled_names)
if ask_kb_tool is not None:
_inject_ask_kb_tool_in_place(spec, ask_kb_tool)
specs.append(spec)
return specs