subagents/knowledge_base: wire KB specialist into orchestrator (renderer/projector split, FS middleware stack, cloud-mode gating)

This commit is contained in:
CREDO23 2026-05-11 20:43:44 +02:00
parent 09fc99c435
commit df2afed18d
10 changed files with 230 additions and 41 deletions

View file

@ -24,4 +24,5 @@ def build_knowledge_priority_mw(
available_connectors=available_connectors,
available_document_types=available_document_types,
mentioned_document_ids=mentioned_document_ids,
inject_system_message=False,
)

View file

@ -20,4 +20,5 @@ def build_knowledge_tree_mw(
search_space_id=search_space_id,
filesystem_mode=filesystem_mode,
llm=llm,
inject_system_message=False,
)

View file

@ -0,0 +1,54 @@
"""Project ``workspace_tree_text`` + ``kb_priority`` from state into SystemMessages."""
from __future__ import annotations
from typing import Any
from langchain.agents.middleware import AgentMiddleware, AgentState
from langchain_core.messages import SystemMessage
from langgraph.runtime import Runtime
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.knowledge_search import _render_priority_message
class KbContextProjectionMiddleware(AgentMiddleware): # type: ignore[type-arg]
"""Emit ``<workspace_tree>`` + ``<priority_documents>`` from shared state.
Read-only consumer: no DB, no LLM, no state writes. The orchestrator's
renderer middlewares populate the source fields; this projection lets any
agent (orchestrator or subagent) put the same content in front of its
own LLM call.
"""
tools = ()
state_schema = SurfSenseFilesystemState
def before_agent( # type: ignore[override]
self,
state: AgentState,
runtime: Runtime[Any],
) -> dict[str, Any] | None:
del runtime
tree_text = state.get("workspace_tree_text")
priority = state.get("kb_priority")
if not tree_text and not priority:
return None
messages = list(state.get("messages") or [])
insert_at = max(len(messages) - 1, 0)
if tree_text:
messages.insert(insert_at, SystemMessage(content=tree_text))
if priority:
messages.insert(insert_at, _render_priority_message(priority))
return {"messages": messages}
def build_kb_context_projection_mw() -> KbContextProjectionMiddleware:
return KbContextProjectionMiddleware()
__all__ = [
"KbContextProjectionMiddleware",
"build_kb_context_projection_mw",
]

View file

@ -19,6 +19,9 @@ from app.agents.multi_agent_chat.subagents import (
from app.agents.multi_agent_chat.subagents.builtins.general_purpose.agent import (
build_subagent as build_general_purpose_subagent,
)
from app.agents.multi_agent_chat.subagents.builtins.knowledge_base.agent import (
build_subagent as build_knowledge_base_subagent,
)
from app.agents.multi_agent_chat.subagents.shared.permissions import ToolsPermissions
from app.agents.new_chat.feature_flags import AgentFeatureFlags
from app.agents.new_chat.filesystem_selection import FilesystemMode
@ -45,6 +48,7 @@ from .shared.anthropic_cache import build_anthropic_cache_mw
from .shared.compaction import build_compaction_mw
from .shared.file_intent import build_file_intent_mw
from .shared.filesystem import build_filesystem_mw
from .shared.kb_context_projection import build_kb_context_projection_mw
from .shared.memory import build_memory_mw
from .shared.patch_tool_calls import build_patch_tool_calls_mw
from .shared.permissions import (
@ -106,6 +110,21 @@ def build_main_agent_deepagent_middleware(
memory_mw=memory_mw,
)
# Cloud-only: KB filesystem operations are delegated to a specialist subagent.
# Desktop mode keeps FS on the main agent (see kb_main_strip).
knowledge_base_subagent: SubAgent | None = None
if filesystem_mode == FilesystemMode.CLOUD:
knowledge_base_subagent = build_knowledge_base_subagent(
llm=llm,
backend_resolver=backend_resolver,
filesystem_mode=filesystem_mode,
search_space_id=search_space_id,
user_id=user_id,
thread_id=thread_id,
permissions=permissions,
resilience=resilience,
)
subagents_registry: list[SubAgent] = []
try:
subagent_extras = build_subagent_extras(
@ -132,7 +151,10 @@ def build_main_agent_deepagent_middleware(
)
subagents_registry = []
subagents: list[SubAgent] = [general_purpose_subagent, *subagents_registry]
subagents: list[SubAgent] = [general_purpose_subagent]
if knowledge_base_subagent is not None:
subagents.append(knowledge_base_subagent)
subagents.extend(subagents_registry)
stack: list[Any] = [
build_busy_mutex_mw(flags),
@ -155,6 +177,7 @@ def build_main_agent_deepagent_middleware(
available_document_types=available_document_types,
mentioned_document_ids=mentioned_document_ids,
),
build_kb_context_projection_mw(),
build_file_intent_mw(llm),
build_filesystem_mw(
backend_resolver=backend_resolver,

View file

@ -1,52 +1,117 @@
"""`knowledge_base` route: ``SubAgent`` spec for the SurfSense KB specialist.
The KB subagent owns the `/documents/` workspace: reading, writing, editing,
searching, and organising user documents.
searching, and organising user documents. It shares the orchestrator's
``workspace_tree_text`` and ``kb_priority`` via state and re-emits them as
SystemMessages through the projection middleware (no extra DB / LLM calls).
"""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from typing import Any, cast
from deepagents import SubAgent
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware
from langchain_core.language_models import BaseChatModel
from app.agents.multi_agent_chat.middleware.shared.anthropic_cache import (
build_anthropic_cache_mw,
)
from app.agents.multi_agent_chat.middleware.shared.compaction import (
build_compaction_mw,
)
from app.agents.multi_agent_chat.middleware.shared.filesystem import (
build_filesystem_mw,
)
from app.agents.multi_agent_chat.middleware.shared.kb_context_projection import (
build_kb_context_projection_mw,
)
from app.agents.multi_agent_chat.middleware.shared.patch_tool_calls import (
build_patch_tool_calls_mw,
)
from app.agents.multi_agent_chat.middleware.shared.permissions import (
PermissionContext,
)
from app.agents.multi_agent_chat.middleware.shared.resilience import (
ResilienceBundle,
)
from app.agents.multi_agent_chat.middleware.shared.todos import build_todos_mw
from app.agents.multi_agent_chat.subagents.shared.md_file_reader import (
read_md_file,
)
from app.agents.multi_agent_chat.subagents.shared.subagent_builder import (
pack_subagent,
)
from app.agents.new_chat.filesystem_selection import FilesystemMode
from .tools.index import destructive_fs_interrupt_on
NAME = "knowledge_base"
def build_subagent(
*,
dependencies: dict[str, Any],
model: BaseChatModel | None = None,
extra_middleware: Sequence[Any] | None = None,
**_: Any,
llm: BaseChatModel,
backend_resolver: Any,
filesystem_mode: FilesystemMode,
search_space_id: int,
user_id: str | None,
thread_id: int | None,
permissions: PermissionContext,
resilience: ResilienceBundle,
) -> SubAgent:
"""Build the knowledge-base subagent spec.
The FS toolset and SurfSense filesystem middleware land in a follow-up
commit (``kb_middleware``); at this stage ``tools`` is intentionally
empty so the spec is structurally valid but inert.
"""
del dependencies # plumbed for symmetry; no per-route tools at this stage.
"""Deny + resilience inserts encapsulated here so the orchestrator never mutates the list."""
description = read_md_file(__package__, "description").strip()
if not description:
description = (
"Handles knowledge-base reads, writes, edits, and organisation."
)
system_prompt = read_md_file(__package__, "system_prompt").strip()
return pack_subagent(
name=NAME,
description=description,
system_prompt=system_prompt,
tools=[],
model=model,
extra_middleware=extra_middleware,
)
middleware: list[Any] = [
build_todos_mw(),
build_kb_context_projection_mw(),
build_filesystem_mw(
backend_resolver=backend_resolver,
filesystem_mode=filesystem_mode,
search_space_id=search_space_id,
user_id=user_id,
thread_id=thread_id,
),
build_compaction_mw(llm),
build_patch_tool_calls_mw(),
build_anthropic_cache_mw(),
]
if permissions.subagent_deny_mw is not None:
patch_idx = next(
(
i
for i, m in enumerate(middleware)
if isinstance(m, PatchToolCallsMiddleware)
),
len(middleware),
)
middleware.insert(patch_idx, permissions.subagent_deny_mw)
resilience_mws = resilience.as_list()
if resilience_mws:
cache_idx = next(
(
i
for i, m in enumerate(middleware)
if isinstance(m, AnthropicPromptCachingMiddleware)
),
len(middleware),
)
for offset, mw in enumerate(resilience_mws):
middleware.insert(cache_idx + offset, mw)
spec: dict[str, Any] = {
"name": NAME,
"description": description,
"system_prompt": system_prompt,
"model": llm,
"tools": [],
"middleware": middleware,
"interrupt_on": destructive_fs_interrupt_on(),
}
return cast(SubAgent, spec)

View file

@ -0,0 +1 @@
"""Route-local tool policy for the ``knowledge_base`` subagent."""

View file

@ -0,0 +1,30 @@
"""Route-local FS tool policy.
The KB subagent's actual ``BaseTool`` instances are provided at runtime by
``SurfSenseFilesystemMiddleware`` (mounted in ``agent.py``). This module only
carries policy that the subagent spec needs to declare up front which
destructive ops require explicit user confirmation via ``interrupt_on``.
Mirrors the ``desktop_safety`` ruleset in
``multi_agent_chat.middleware.shared.permissions.context``: in desktop mode
those rules guard the main-agent FS toolset; in cloud mode the same toolset
lives on the KB subagent and the same policy is enforced here instead.
"""
from __future__ import annotations
DESTRUCTIVE_FS_OPS: tuple[str, ...] = (
"rm",
"rmdir",
"move_file",
"edit_file",
"write_file",
)
def destructive_fs_interrupt_on() -> dict[str, bool]:
"""Fresh ``interrupt_on`` dict for the KB subagent spec."""
return {op: True for op in DESTRUCTIVE_FS_OPS}
__all__ = ["DESTRUCTIVE_FS_OPS", "destructive_fs_interrupt_on"]

View file

@ -17,6 +17,7 @@ extra fields needed to implement Postgres-backed virtual filesystem semantics:
* ``kb_matched_chunk_ids`` internal hand-off for matched-chunk highlighting.
* ``kb_anon_doc`` Redis-loaded anonymous document (if any).
* ``tree_version`` bumped by persistence; invalidates the tree render cache.
* ``workspace_tree_text`` pre-rendered ``<workspace_tree>`` body for the turn.
Tools mutate these fields ONLY via ``Command(update=...)`` returns; the
reducers in :mod:`app.agents.new_chat.state_reducers` handle merging.
@ -168,6 +169,9 @@ class SurfSenseFilesystemState(FilesystemState):
tree_version: NotRequired[Annotated[int, _replace_reducer]]
"""Monotonically increasing counter; bumped when commits change the KB tree."""
workspace_tree_text: NotRequired[Annotated[str, _replace_reducer]]
"""Pre-rendered ``<workspace_tree>`` body; shared with subagents to skip re-render."""
__all__ = [
"KbAnonDoc",

View file

@ -584,6 +584,7 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
available_document_types: list[str] | None = None,
top_k: int = 10,
mentioned_document_ids: list[int] | None = None,
inject_system_message: bool = True, # For backwards compatibility
) -> None:
self.llm = llm
self.search_space_id = search_space_id
@ -592,6 +593,7 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
self.available_document_types = available_document_types
self.top_k = top_k
self.mentioned_document_ids = mentioned_document_ids or []
self.inject_system_message = inject_system_message
# Build the kb-planner private Runnable ONCE here so we don't pay
# the ``create_agent`` compile cost (50-200ms) on every turn.
# Disabled by default behind ``enable_kb_planner_runnable``; when
@ -772,14 +774,16 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
"mentioned": True,
}
]
new_messages = list(state.get("messages") or [])
insert_at = max(len(new_messages) - 1, 0)
new_messages.insert(insert_at, _render_priority_message(priority))
return {
update: dict[str, Any] = {
"kb_priority": priority,
"kb_matched_chunk_ids": {},
"messages": new_messages,
}
if self.inject_system_message:
new_messages = list(state.get("messages") or [])
insert_at = max(len(new_messages) - 1, 0)
new_messages.insert(insert_at, _render_priority_message(priority))
update["messages"] = new_messages
return update
async def _authenticated_priority(
self,
@ -876,10 +880,6 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
priority, matched_chunk_ids = await self._materialize_priority(merged)
new_messages = list(messages)
insert_at = max(len(new_messages) - 1, 0)
new_messages.insert(insert_at, _render_priority_message(priority))
_perf_log.info(
"[kb_priority] completed in %.3fs query=%r priority=%d mentioned=%d",
asyncio.get_event_loop().time() - t0,
@ -888,11 +888,16 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
len(mentioned_results),
)
return {
update: dict[str, Any] = {
"kb_priority": priority,
"kb_matched_chunk_ids": matched_chunk_ids,
"messages": new_messages,
}
if self.inject_system_message:
new_messages = list(messages)
insert_at = max(len(new_messages) - 1, 0)
new_messages.insert(insert_at, _render_priority_message(priority))
update["messages"] = new_messages
return update
async def _materialize_priority(
self, merged: list[dict[str, Any]]

View file

@ -105,12 +105,14 @@ class KnowledgeTreeMiddleware(AgentMiddleware): # type: ignore[type-arg]
llm: BaseChatModel | None = None,
max_entries: int = MAX_TREE_ENTRIES,
max_tokens: int = MAX_TREE_TOKENS,
inject_system_message: bool = True, # For backwards compatibility
) -> None:
self.search_space_id = search_space_id
self.filesystem_mode = filesystem_mode
self.llm = llm
self.max_entries = max_entries
self.max_tokens = max_tokens
self.inject_system_message = inject_system_message
self._cache: dict[tuple[int, int, bool], str] = {}
async def abefore_agent( # type: ignore[override]
@ -132,10 +134,13 @@ class KnowledgeTreeMiddleware(AgentMiddleware): # type: ignore[type-arg]
else:
tree_msg = await self._render_kb_tree(state)
messages = list(state.get("messages") or [])
insert_at = max(len(messages) - 1, 0)
messages.insert(insert_at, SystemMessage(content=tree_msg))
update["messages"] = messages
update["workspace_tree_text"] = tree_msg
if self.inject_system_message:
messages = list(state.get("messages") or [])
insert_at = max(len(messages) - 1, 0)
messages.insert(insert_at, SystemMessage(content=tree_msg))
update["messages"] = messages
return update
def before_agent( # type: ignore[override]