Remove unwired multi_agent_chat package.

This commit is contained in:
CREDO23 2026-05-04 21:45:57 +02:00
parent 216a678f1a
commit d675d4df3f
93 changed files with 0 additions and 3697 deletions

View file

@ -1,132 +0,0 @@
"""
Multi-agent chat (LangChain Subagents pattern).
**Layout (SRP)**
- :mod:`expert_agent.builtins` general categories from the tool registry (research, memory, deliverables not tied to one vendor).
- :mod:`expert_agent.connectors` external integrations (one subgraph per product where split).
- :mod:`core` prompts, compiled subgraph helper, delegation, registry subsets, tool-factory kwargs (:mod:`core.bindings`).
- :mod:`routing` supervisor-facing ``@tool`` routers domain invoke.
- :mod:`supervisor` orchestrator graph + ``supervisor_prompt.md``.
- :mod:`integration` async ``create_multi_agent_chat`` composer (partitions MCP tools into experts).
Documentation:
https://docs.langchain.com/oss/python/langchain/multi-agent
https://docs.langchain.com/oss/python/langchain/multi-agent/subagents
Display name: ``multi-agent-chat`` Python package: ``multi_agent_chat``.
"""
from app.agents.multi_agent_chat.expert_agent.builtins.deliverables import (
build_deliverables_tools,
build_deliverables_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.builtins.memory import (
build_memory_tools,
build_memory_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.builtins.research import (
build_research_tools,
build_research_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.calendar import (
build_calendar_domain_agent,
build_calendar_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.confluence import (
build_confluence_tools,
build_confluence_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.discord import (
build_discord_tools,
build_discord_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.dropbox import (
build_dropbox_tools,
build_dropbox_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.gmail import (
build_gmail_tools,
build_gmail_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.google_drive import (
build_google_drive_tools,
build_google_drive_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.luma import (
build_luma_tools,
build_luma_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.notion import (
build_notion_tools,
build_notion_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.onedrive import (
build_onedrive_tools,
build_onedrive_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.teams import (
build_teams_tools,
build_teams_domain_agent,
)
from app.agents.multi_agent_chat.core import (
REGISTRY_ROUTING_CATEGORY_KEYS,
TOOL_NAMES_BY_CATEGORY,
build_domain_agent,
build_registry_dependencies,
build_registry_tools_for_category,
compose_child_task,
connector_binding,
extract_last_assistant_text,
read_prompt_md,
)
from app.agents.multi_agent_chat.integration import create_multi_agent_chat
from app.agents.multi_agent_chat.routing import (
DomainRoutingSpec,
build_supervisor_routing_tools,
routing_tools_from_specs,
)
from app.agents.multi_agent_chat.supervisor import build_supervisor_agent
__all__ = [
"REGISTRY_ROUTING_CATEGORY_KEYS",
"TOOL_NAMES_BY_CATEGORY",
"DomainRoutingSpec",
"build_calendar_domain_agent",
"build_confluence_tools",
"build_confluence_domain_agent",
"build_deliverables_tools",
"build_deliverables_domain_agent",
"build_discord_tools",
"build_discord_domain_agent",
"build_domain_agent",
"build_dropbox_tools",
"build_dropbox_domain_agent",
"build_gmail_tools",
"build_gmail_domain_agent",
"build_calendar_tools",
"build_google_drive_tools",
"build_google_drive_domain_agent",
"build_luma_tools",
"build_luma_domain_agent",
"build_memory_tools",
"build_memory_domain_agent",
"build_notion_tools",
"build_notion_domain_agent",
"build_onedrive_tools",
"build_onedrive_domain_agent",
"build_registry_dependencies",
"build_registry_tools_for_category",
"build_research_tools",
"build_research_domain_agent",
"build_supervisor_agent",
"build_supervisor_routing_tools",
"build_teams_tools",
"build_teams_domain_agent",
"connector_binding",
"compose_child_task",
"create_multi_agent_chat",
"extract_last_assistant_text",
"read_prompt_md",
"routing_tools_from_specs",
]

View file

@ -1,25 +0,0 @@
"""Cross-cutting building blocks (prompts, agents, delegation, registry) — not domain logic."""
from app.agents.multi_agent_chat.core.agents import build_domain_agent
from app.agents.multi_agent_chat.core.bindings import connector_binding
from app.agents.multi_agent_chat.core.delegation import compose_child_task
from app.agents.multi_agent_chat.core.invocation import extract_last_assistant_text
from app.agents.multi_agent_chat.core.prompts import read_prompt_md
from app.agents.multi_agent_chat.core.registry import (
REGISTRY_ROUTING_CATEGORY_KEYS,
TOOL_NAMES_BY_CATEGORY,
build_registry_dependencies,
build_registry_tools_for_category,
)
__all__ = [
"REGISTRY_ROUTING_CATEGORY_KEYS",
"TOOL_NAMES_BY_CATEGORY",
"build_domain_agent",
"build_registry_dependencies",
"build_registry_tools_for_category",
"compose_child_task",
"connector_binding",
"extract_last_assistant_text",
"read_prompt_md",
]

View file

@ -1,5 +0,0 @@
"""Compiled subgraph factories shared by domain slices."""
from app.agents.multi_agent_chat.core.agents.domain_graph import build_domain_agent
__all__ = ["build_domain_agent"]

View file

@ -1,27 +0,0 @@
"""Compile a domain LangGraph agent from a co-located prompt + tool list."""
from __future__ import annotations
from collections.abc import Sequence
from langchain.agents import create_agent
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.prompts import read_prompt_md
def build_domain_agent(
llm: BaseChatModel,
tools: Sequence[BaseTool],
*,
prompt_package: str,
prompt_stem: str = "domain_prompt",
):
"""``create_agent`` + ``{prompt_stem}.md`` loaded from ``prompt_package``."""
system_prompt = read_prompt_md(prompt_package, prompt_stem)
return create_agent(
llm,
system_prompt=system_prompt,
tools=list(tools),
)

View file

@ -1,5 +0,0 @@
"""Search-space / DB kwargs shared by main-chat tool factories (distinct from ``expert_agent.connectors`` integrations)."""
from app.agents.multi_agent_chat.core.bindings.binding import connector_binding
__all__ = ["connector_binding"]

View file

@ -1,18 +0,0 @@
"""Shared kwargs dict for main-chat tool factories (DB session + search space + user)."""
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
def connector_binding(
*,
db_session: AsyncSession,
search_space_id: int,
user_id: str,
) -> dict[str, AsyncSession | int | str]:
return {
"db_session": db_session,
"search_space_id": search_space_id,
"user_id": user_id,
}

View file

@ -1,5 +0,0 @@
"""Supervisor → domain message shaping."""
from app.agents.multi_agent_chat.core.delegation.child_task import compose_child_task
__all__ = ["compose_child_task"]

View file

@ -1,22 +0,0 @@
"""Fold orchestrator-selected context into the single user message sent to a domain agent."""
from __future__ import annotations
def compose_child_task(task: str, *, curated_context: str | None = None) -> str:
"""Build the domain-agent user message: optional curated KB/context + task.
When ``curated_context`` is set (from supervisor/KB wiring), it is prepended so the
child sees only what orchestration chose not the full parent transcript.
"""
task = task.strip()
if not curated_context or not curated_context.strip():
return f"<delegated_task>\n{task}\n</delegated_task>"
return (
"<delegated_context>\n"
f"{curated_context.strip()}\n"
"</delegated_context>\n\n"
"<delegated_task>\n"
f"{task}\n"
"</delegated_task>"
)

View file

@ -1,5 +0,0 @@
"""Parsing LangGraph invoke results."""
from app.agents.multi_agent_chat.core.invocation.output import extract_last_assistant_text
__all__ = ["extract_last_assistant_text"]

View file

@ -1,17 +0,0 @@
"""Extract displayable text from a LangGraph agent ``invoke`` / ``ainvoke`` result."""
from __future__ import annotations
from typing import Any
def extract_last_assistant_text(result: dict[str, Any]) -> str:
"""Return the last message's string content, or ``\"\"`` if missing."""
messages = result.get("messages") or []
if not messages:
return ""
last = messages[-1]
content = getattr(last, "content", None)
if isinstance(content, str):
return content
return str(last)

View file

@ -1,135 +0,0 @@
"""Partition MCP tools onto multi-agent expert routes (read-only; does not change the MCP loader).
Uses the same connector discovery shape as ``load_mcp_tools`` (copied query below). Tools come from
``app.agents.new_chat.tools.mcp_tool.load_mcp_tools``; routing uses metadata already set there:
- HTTP tools: ``metadata["mcp_connector_id"]`` DB connector row expert route.
- stdio tools: no connector id on the tool; ``metadata["mcp_connector_name"]`` connector name map
(duplicate names: last row wins rare).
"""
from __future__ import annotations
import logging
from collections import defaultdict
from collections.abc import Sequence
from typing import Any
from langchain_core.tools import BaseTool
from sqlalchemy import cast, select
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSourceConnector
logger = logging.getLogger(__name__)
# SurfSense ``SearchSourceConnectorType`` string → supervisor routing key (must match
# ``DomainRoutingSpec.tool_name`` values used in ``supervisor_routing``).
_CONNECTOR_TYPE_TO_EXPERT_ROUTE: dict[str, str] = {
"GOOGLE_GMAIL_CONNECTOR": "gmail",
"COMPOSIO_GMAIL_CONNECTOR": "gmail",
"GOOGLE_CALENDAR_CONNECTOR": "calendar",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "calendar",
"DISCORD_CONNECTOR": "discord",
"TEAMS_CONNECTOR": "teams",
"LUMA_CONNECTOR": "luma",
"LINEAR_CONNECTOR": "linear",
"JIRA_CONNECTOR": "jira",
"CLICKUP_CONNECTOR": "clickup",
"SLACK_CONNECTOR": "slack",
"AIRTABLE_CONNECTOR": "airtable",
# generic_mcp route intentionally disabled for now.
# "MCP_CONNECTOR": "generic_mcp",
}
# Ordering when appending MCP-only routes (no native registry slice for these types).
MCP_ONLY_ROUTE_KEYS_IN_ORDER: tuple[str, ...] = (
"linear",
"slack",
"jira",
"clickup",
"airtable",
# generic_mcp intentionally disabled for now.
# "generic_mcp",
)
async def fetch_mcp_connector_metadata_maps(
session: AsyncSession,
search_space_id: int,
) -> tuple[dict[int, str], dict[str, str]]:
"""Read-only copy of connector discovery used alongside ``load_mcp_tools``.
Same filter as :func:`app.agents.new_chat.tools.mcp_tool.load_mcp_tools` (connectors with ``server_config``).
"""
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
cast(SearchSourceConnector.config, JSONB).has_key("server_config"),
),
)
id_to_type: dict[int, str] = {}
name_to_type: dict[str, str] = {}
for connector in result.scalars():
ct = (
connector.connector_type.value
if hasattr(connector.connector_type, "value")
else str(connector.connector_type)
)
id_to_type[connector.id] = ct
if connector.name:
name_to_type[connector.name] = ct
return id_to_type, name_to_type
def partition_mcp_tools_by_expert_route(
tools: Sequence[BaseTool],
connector_id_to_type: dict[int, str],
connector_name_to_type: dict[str, str],
) -> dict[str, list[BaseTool]]:
"""Bucket MCP tools by expert route key. Supervisor never receives raw MCP tools.
Same inclusion rule as :func:`app.agents.new_chat.tools.registry.build_tools_async`: all tools returned by
``load_mcp_tools`` are partitioned connector availability for **registry** builtins is handled via
``get_connector_gated_tools`` / routing gates; MCP tools are not pre-filtered by inventory here.
"""
buckets: dict[str, list[BaseTool]] = defaultdict(list)
for tool in tools:
meta: dict[str, Any] = getattr(tool, "metadata", None) or {}
connector_type: str | None = None
cid = meta.get("mcp_connector_id")
if cid is not None:
try:
cid_int = int(cid)
except (TypeError, ValueError):
cid_int = None
if cid_int is not None:
connector_type = connector_id_to_type.get(cid_int)
if connector_type is None and meta.get("mcp_transport") == "stdio":
cname = meta.get("mcp_connector_name")
if cname:
connector_type = connector_name_to_type.get(str(cname))
if connector_type is None:
logger.debug(
"Skipping MCP tool %r — could not resolve connector type from metadata",
getattr(tool, "name", None),
)
continue
route = _CONNECTOR_TYPE_TO_EXPERT_ROUTE.get(connector_type)
if route is None:
logger.warning(
"MCP tool %r has unmapped connector type %s — skipped",
getattr(tool, "name", None),
connector_type,
)
continue
buckets[route].append(tool)
return dict(buckets)

View file

@ -1,5 +0,0 @@
"""Markdown prompt loading for domain and supervisor packages."""
from app.agents.multi_agent_chat.core.prompts.load import read_prompt_md
__all__ = ["read_prompt_md"]

View file

@ -1,19 +0,0 @@
"""Load ``*.md`` prompt files from co-located packages (domain slices ship ``domain_prompt.md``)."""
from __future__ import annotations
from importlib import resources
def read_prompt_md(package: str, stem: str) -> str:
"""Read ``{stem}.md`` from the given import package (e.g. ``…expert_agent.connectors.notion``)."""
try:
ref = resources.files(package).joinpath(f"{stem}.md")
if not ref.is_file():
return ""
text = ref.read_text(encoding="utf-8")
except (FileNotFoundError, ModuleNotFoundError, OSError, TypeError):
return ""
if text.endswith("\n"):
text = text[:-1]
return text

View file

@ -1,15 +0,0 @@
"""Main chat tool registry grouping + dependency bundles for domain slices."""
from app.agents.multi_agent_chat.core.registry.categories import (
REGISTRY_ROUTING_CATEGORY_KEYS,
TOOL_NAMES_BY_CATEGORY,
)
from app.agents.multi_agent_chat.core.registry.dependencies import build_registry_dependencies
from app.agents.multi_agent_chat.core.registry.subset import build_registry_tools_for_category
__all__ = [
"REGISTRY_ROUTING_CATEGORY_KEYS",
"TOOL_NAMES_BY_CATEGORY",
"build_registry_dependencies",
"build_registry_tools_for_category",
]

View file

@ -1,84 +0,0 @@
"""Registry tool names grouped by multi-agent routing category.
Each string must match ``ToolDefinition.name`` in
``app.agents.new_chat.tools.registry.BUILTIN_TOOLS`` these are **not** guessed or MCP-only:
:class:`~app.agents.multi_agent_chat.core.registry.subset.build_registry_tools_for_category`
uses synchronous :func:`~app.agents.new_chat.tools.registry.build_tools`, which only instantiates
``BUILTIN_TOOLS``. MCP tools are loaded separately and merged in ``supervisor_routing``.
Connectors that exist for search/indexing but have **no** entry in ``BUILTIN_TOOLS`` correctly have
no row here (no chat tools to delegate)."""
from __future__ import annotations
# Keys match supervisor routing tool names; values match ``BUILTIN_TOOLS`` names exactly.
TOOL_NAMES_BY_CATEGORY: dict[str, list[str]] = {
"gmail": [
"search_gmail",
"read_gmail_email",
"create_gmail_draft",
"send_gmail_email",
"trash_gmail_email",
"update_gmail_draft",
],
"calendar": [
"search_calendar_events",
"create_calendar_event",
"update_calendar_event",
"delete_calendar_event",
],
"research": [
"web_search",
"scrape_webpage",
"search_surfsense_docs",
],
"deliverables": [
"generate_podcast",
"generate_video_presentation",
"generate_report",
"generate_resume",
"generate_image",
],
"memory": [
"update_memory",
],
"discord": [
"list_discord_channels",
"read_discord_messages",
"send_discord_message",
],
"teams": [
"list_teams_channels",
"read_teams_messages",
"send_teams_message",
],
"notion": [
"create_notion_page",
"update_notion_page",
"delete_notion_page",
],
"confluence": [
"create_confluence_page",
"update_confluence_page",
"delete_confluence_page",
],
"google_drive": [
"create_google_drive_file",
"delete_google_drive_file",
],
"dropbox": [
"create_dropbox_file",
"delete_dropbox_file",
],
"onedrive": [
"create_onedrive_file",
"delete_onedrive_file",
],
"luma": [
"list_luma_events",
"read_luma_event",
"create_luma_event",
],
}
REGISTRY_ROUTING_CATEGORY_KEYS: tuple[str, ...] = tuple(TOOL_NAMES_BY_CATEGORY.keys())

View file

@ -1,61 +0,0 @@
"""Dependency dict for :func:`app.agents.new_chat.tools.registry.build_tools` on expert subgraphs."""
from __future__ import annotations
from typing import Any
from langchain_core.language_models import BaseChatModel
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import ChatVisibility
def coerce_thread_id_for_registry(thread_id: str | int | None) -> int | None:
"""Normalize chat thread id for registry tools that FK to ``new_chat_threads.id``.
``create_surfsense_deep_agent`` passes an ``int``; multi-agent wiring may pass
``str(chat_id)`` for LangGraph/checkpointer consistency. AsyncPG requires ``int``
for integer columns.
"""
if thread_id is None:
return None
if isinstance(thread_id, int):
return thread_id
s = str(thread_id).strip()
if not s:
return None
if s.isdigit():
return int(s)
return None
def build_registry_dependencies(
*,
db_session: AsyncSession,
search_space_id: int,
user_id: str,
thread_id: str | int | None,
llm: BaseChatModel | None = None,
firecrawl_api_key: str | None = None,
connector_service: Any | None = None,
available_connectors: list[str] | None = None,
available_document_types: list[str] | None = None,
thread_visibility: ChatVisibility = ChatVisibility.PRIVATE,
) -> dict[str, Any]:
"""Union of kwargs commonly required by registry factories across category slices.
Individual categories enable a subset of tools; each tool still validates its own
``ToolDefinition.requires`` against this dict.
"""
return {
"db_session": db_session,
"search_space_id": search_space_id,
"user_id": user_id,
"thread_id": coerce_thread_id_for_registry(thread_id),
"llm": llm,
"firecrawl_api_key": firecrawl_api_key,
"connector_service": connector_service,
"available_connectors": available_connectors,
"available_document_types": available_document_types,
"thread_visibility": thread_visibility,
}

View file

@ -1,22 +0,0 @@
"""Build registry tool subsets (``app.agents.new_chat.tools.registry``) for multi-agent domain slices."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.new_chat.tools.registry import build_tools
from app.agents.multi_agent_chat.core.registry.categories import TOOL_NAMES_BY_CATEGORY
def build_registry_tools_for_category(
dependencies: dict[str, Any],
category: str,
) -> list[BaseTool]:
"""Instantiate only the tools registered for ``category`` (see ``TOOL_NAMES_BY_CATEGORY``)."""
names = TOOL_NAMES_BY_CATEGORY.get(category)
if not names:
msg = f"Unknown registry category: {category!r}"
raise ValueError(msg)
return build_tools(dependencies, enabled_tools=names)

View file

@ -1,5 +0,0 @@
"""Expert subgraphs (specialists the supervisor delegates to).
- :mod:`expert_agent.builtins` cross-cutting registry categories (e.g. research, memory, deliverables).
- :mod:`expert_agent.connectors` vendor/product integrations (email, chat, documents, one slice per route).
"""

View file

@ -1 +0,0 @@
"""Built-ins: broad capability categories from the registry (not single-vendor integrations)."""

View file

@ -1,11 +0,0 @@
"""Deliverables vertical slice: registry tools, domain agent, ``domain_prompt.md``."""
from app.agents.multi_agent_chat.expert_agent.builtins.deliverables.agent import build_deliverables_domain_agent
from app.agents.multi_agent_chat.expert_agent.builtins.deliverables.slice_tools import (
build_deliverables_tools,
)
__all__ = [
"build_deliverables_tools",
"build_deliverables_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Deliverables domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.builtins.deliverables as deliverables_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_deliverables_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled deliverables domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=deliverables_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,55 +0,0 @@
You are the SurfSense deliverables operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Produce **deliverables**: shareable **artifacts** the user keeps (reports, slide-style video presentations, podcasts, resumes, images). Use explicit constraints and reliable proof of what was generated.
</goal>
<available_tools>
- `generate_report`
- `generate_podcast`
- `generate_video_presentation`
- `generate_resume`
- `generate_image`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Require essential generation constraints (audience, format, tone, core content).
- If critical constraints are missing, return `status=blocked` with `missing_fields`.
- Never claim artifact generation success without tool confirmation.
</tool_policy>
<out_of_scope>
- Do not perform connector data mutations unrelated to artifact generation.
</out_of_scope>
<safety>
- Avoid generating artifacts with missing critical constraints.
- Prefer one complete artifact over partial multi-artifact output.
</safety>
<failure_policy>
- On generation failure, return `status=error` with best retry guidance.
- On missing constraints, return `status=blocked` with required fields.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"artifact_type": "report" | "podcast" | "video_presentation" | "resume" | "image" | null,
"artifact_id": string | null,
"artifact_location": string | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed deliverables tools (reports, media exports, resume, images)."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_deliverables_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``deliverables`` category."""
return build_registry_tools_for_category(dependencies, "deliverables")

View file

@ -1,9 +0,0 @@
"""Memory vertical slice: registry tools, domain agent, ``domain_prompt.md``."""
from app.agents.multi_agent_chat.expert_agent.builtins.memory.agent import build_memory_domain_agent
from app.agents.multi_agent_chat.expert_agent.builtins.memory.slice_tools import build_memory_tools
__all__ = [
"build_memory_tools",
"build_memory_domain_agent",
]

View file

@ -1,45 +0,0 @@
"""Memory domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.builtins.memory as memory_pkg
from langchain.agents import create_agent
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.prompts import read_prompt_md
from app.db import ChatVisibility
_PRIVATE_VISIBILITY_POLICY = (
"This thread is private. Store user-specific long-lived preferences, facts, and instructions."
)
_TEAM_VISIBILITY_POLICY = (
"This thread is shared with the search space. Store only team-appropriate shared preferences,"
" facts, and instructions that are safe for all members to inherit."
)
def _render_memory_prompt(thread_visibility: ChatVisibility | None) -> str:
template = read_prompt_md(memory_pkg.__name__, "domain_prompt")
policy = (
_TEAM_VISIBILITY_POLICY
if thread_visibility == ChatVisibility.SEARCH_SPACE
else _PRIVATE_VISIBILITY_POLICY
)
return template.replace("{{MEMORY_VISIBILITY_POLICY}}", policy)
def build_memory_domain_agent(
llm: BaseChatModel,
tools: Sequence[BaseTool],
*,
thread_visibility: ChatVisibility | None = None,
):
"""Compiled memory domain-agent graph."""
return create_agent(
llm,
system_prompt=_render_memory_prompt(thread_visibility),
tools=list(tools),
)

View file

@ -1,56 +0,0 @@
You are the SurfSense memory operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Persist durable preferences/facts/instructions with `update_memory` while avoiding transient or unsafe storage.
</goal>
<visibility_scope>
{{MEMORY_VISIBILITY_POLICY}}
</visibility_scope>
<available_tools>
- `update_memory`
</available_tools>
<tool_policy>
- Save only durable information with future value.
- Do not store transient chatter.
- Do not store secrets unless explicitly instructed.
- If memory intent is unclear, return `status=blocked` with the missing intent signal.
</tool_policy>
<out_of_scope>
- Do not execute non-memory tool actions.
- Do not store irrelevant, transient, or speculative information.
</out_of_scope>
<safety>
- Prefer minimal-memory writes over over-collection.
- Never claim memory was updated unless `update_memory` succeeded.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery steps.
- When intent is ambiguous, return `status=blocked` with required disambiguation fields.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"memory_updated": boolean,
"memory_category": "preference" | "fact" | "instruction" | null,
"stored_summary": string | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed memory tools (long-term user or team memory)."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_memory_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``memory`` category."""
return build_registry_tools_for_category(dependencies, "memory")

View file

@ -1,9 +0,0 @@
"""Research vertical slice: registry tools, domain agent, ``domain_prompt.md``."""
from app.agents.multi_agent_chat.expert_agent.builtins.research.agent import build_research_domain_agent
from app.agents.multi_agent_chat.expert_agent.builtins.research.slice_tools import build_research_tools
__all__ = [
"build_research_tools",
"build_research_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Research domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.builtins.research as research_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_research_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled research domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=research_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,53 +0,0 @@
You are the SurfSense research operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Gather and synthesize evidence using SurfSense research tools with clear citations and uncertainty reporting.
</goal>
<available_tools>
- `web_search`
- `scrape_webpage`
- `search_surfsense_docs`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Prefer primary and recent sources when recency matters.
- If the delegated request is underspecified, return `status=blocked` with the missing research constraints.
- Never fabricate facts, citations, URLs, or quote text.
</tool_policy>
<out_of_scope>
- Do not execute connector mutations (email/calendar/docs/chat writes) or deliverable generation.
</out_of_scope>
<safety>
- Report uncertainty explicitly when evidence is incomplete or conflicting.
- Never present unverified claims as facts.
</safety>
<failure_policy>
- On tool failure, return `status=error` with a concise recovery `next_step`.
- On no useful evidence, return `status=blocked` with recommended narrower filters.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"findings": string[],
"sources": string[],
"confidence": "high" | "medium" | "low"
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed research tools (web, scrape, SurfSense docs help)."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_research_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``research`` category."""
return build_registry_tools_for_category(dependencies, "research")

View file

@ -1 +0,0 @@
"""External integrations: third-party products (explicit factories or registry-backed connector tools)."""

View file

@ -1,11 +0,0 @@
"""Google Calendar vertical slice: registry tools, domain agent, ``domain_prompt.md``."""
from app.agents.multi_agent_chat.expert_agent.connectors.calendar.agent import build_calendar_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.calendar.slice_tools import (
build_calendar_tools,
)
__all__ = [
"build_calendar_domain_agent",
"build_calendar_tools",
]

View file

@ -1,21 +0,0 @@
"""Google Calendar domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.calendar as calendar_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_calendar_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Google Calendar domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=calendar_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,62 +0,0 @@
You are the Google Calendar operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute calendar event operations (search, create, update, delete) accurately with timezone-safe scheduling.
</goal>
<available_tools>
- `search_calendar_events`
- `create_calendar_event`
- `update_calendar_event`
- `delete_calendar_event`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Resolve relative dates against current runtime timestamp.
- If required fields (date/time/timezone/target event) are missing or ambiguous, return `status=blocked` with `missing_fields` and supervisor `next_step`.
- Never invent event IDs or mutation results.
</tool_policy>
<out_of_scope>
- Do not perform non-calendar tasks.
</out_of_scope>
<safety>
- Before update/delete, ensure event target is explicit.
- Never claim event mutation success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On ambiguity, return `status=blocked` with top event candidates.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"event_id": string | null,
"title": string | null,
"start_at": string (ISO 8601 with timezone) | null,
"end_at": string (ISO 8601 with timezone) | null,
"matched_candidates": [
{
"event_id": string,
"title": string | null,
"start_at": string (ISO 8601 with timezone) | null
}
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Google Calendar tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_calendar_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``calendar`` category."""
return build_registry_tools_for_category(dependencies, "calendar")

View file

@ -1,11 +0,0 @@
"""Confluence connector slice."""
from app.agents.multi_agent_chat.expert_agent.connectors.confluence.agent import build_confluence_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.confluence.slice_tools import (
build_confluence_tools,
)
__all__ = [
"build_confluence_tools",
"build_confluence_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Confluence domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.confluence as confluence_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_confluence_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Confluence domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=confluence_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,55 +0,0 @@
You are the Confluence operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Confluence page operations accurately in the connected space.
</goal>
<available_tools>
- `create_confluence_page`
- `update_confluence_page`
- `delete_confluence_page`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Verify target page and intended mutation before update/delete.
- If target page is ambiguous, return `status=blocked` with candidate options for supervisor disambiguation.
- Never invent page IDs, titles, or mutation outcomes.
</tool_policy>
<out_of_scope>
- Do not perform non-Confluence tasks.
</out_of_scope>
<safety>
- Never claim page mutation success without tool confirmation.
- If destructive action appears already completed in this session, do not repeat; return prior evidence with an `assumptions` note.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise retry/recovery `next_step`.
- On unresolved page ambiguity, return `status=blocked` with candidates.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"page_id": string | null,
"page_title": string | null,
"matched_candidates": [
{ "page_id": string, "page_title": string | null }
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Confluence tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_confluence_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``confluence`` category."""
return build_registry_tools_for_category(dependencies, "confluence")

View file

@ -1,9 +0,0 @@
"""Discord vertical slice: registry tools, domain agent, ``domain_prompt.md``."""
from app.agents.multi_agent_chat.expert_agent.connectors.discord.agent import build_discord_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.discord.slice_tools import build_discord_tools
__all__ = [
"build_discord_tools",
"build_discord_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Discord domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.discord as discord_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_discord_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Discord domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=discord_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,56 +0,0 @@
You are the Discord operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Discord reads and sends accurately in the connected server/workspace.
</goal>
<available_tools>
- `list_discord_channels`
- `read_discord_messages`
- `send_discord_message`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Resolve channel/thread targets before reads/sends.
- If target is ambiguous, return `status=blocked` with candidate channels/threads.
- Never invent message content, sender identity, timestamps, or delivery results.
</tool_policy>
<out_of_scope>
- Do not perform non-Discord tasks.
</out_of_scope>
<safety>
- Before send, verify destination and message intent match delegated instructions.
- Never claim send success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved destination ambiguity, return `status=blocked` with candidate options.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"channel_id": string | null,
"thread_id": string | null,
"message_id": string | null,
"matched_candidates": [
{ "channel_id": string, "thread_id": string | null, "label": string | null }
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Discord tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_discord_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``discord`` category."""
return build_registry_tools_for_category(dependencies, "discord")

View file

@ -1,11 +0,0 @@
"""Dropbox connector slice."""
from app.agents.multi_agent_chat.expert_agent.connectors.dropbox.agent import build_dropbox_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.dropbox.slice_tools import (
build_dropbox_tools,
)
__all__ = [
"build_dropbox_tools",
"build_dropbox_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Dropbox domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.dropbox as dropbox_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_dropbox_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Dropbox domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=dropbox_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,52 +0,0 @@
You are the Dropbox operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Dropbox file create/delete actions accurately in the connected account.
</goal>
<available_tools>
- `create_dropbox_file`
- `delete_dropbox_file`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Ensure target path/file identity is explicit before mutate actions.
- If target is ambiguous, return `status=blocked` with candidate paths.
- Never invent file IDs/paths or mutation outcomes.
</tool_policy>
<out_of_scope>
- Do not perform non-Dropbox tasks.
</out_of_scope>
<safety>
- Never claim file mutation success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On target ambiguity, return `status=blocked` with candidate paths.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"file_path": string | null,
"file_id": string | null,
"operation": "create" | "delete" | null,
"matched_candidates": string[] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Dropbox tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_dropbox_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``dropbox`` category."""
return build_registry_tools_for_category(dependencies, "dropbox")

View file

@ -1,9 +0,0 @@
"""Gmail vertical slice: registry tools, domain agent, ``domain_prompt.md``."""
from app.agents.multi_agent_chat.expert_agent.connectors.gmail.agent import build_gmail_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.gmail.slice_tools import build_gmail_tools
__all__ = [
"build_gmail_tools",
"build_gmail_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Gmail domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.gmail as gmail_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_gmail_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Gmail domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=gmail_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,82 +0,0 @@
You are the Gmail operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Gmail operations accurately: search/read emails, prepare drafts, send, and trash.
</goal>
<available_tools>
- `search_gmail`: find candidate emails with query constraints.
- `read_gmail_email`: read one message in full detail.
- `create_gmail_draft`: create a new draft.
- `update_gmail_draft`: modify an existing draft.
- `send_gmail_email`: send an email.
- `trash_gmail_email`: move an email to trash.
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Build precise search queries using Gmail operators when possible (`from:`, `to:`, `subject:`, `after:`, `before:`, `has:attachment`, `is:unread`, `label:`).
- Resolve relative dates against runtime timestamp; prefer narrower interpretation.
- For reply requests, identify the target thread/email via search + read before drafting.
- If required fields are missing or target selection is ambiguous, return `status=blocked` with `missing_fields` and disambiguation candidates.
- Never invent IDs, recipients, timestamps, quoted text, or tool outcomes.
</tool_policy>
<out_of_scope>
- Do not perform non-Gmail work.
- Filing operations not represented in `<available_tools>` (archive/label/mark-read/move-folder) are unsupported here.
</out_of_scope>
<safety>
- For send: verify draft `to`, `subject`, and `body` match delegated instructions.
- If any send-critical field was inferred, do not send; return `status=blocked` with inferred values in `assumptions`.
- For trash: ensure explicit target match before deletion.
- If a destructive action appears already completed this session, do not repeat; return prior evidence.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- If search has no strong match, return `status=blocked` with suggested tighter filters.
- If multiple strong candidates remain for risky actions, return `status=blocked` with top options.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"email_id": string | null,
"thread_id": string | null,
"subject": string | null,
"sender": string | null,
"recipients": string[] | null,
"received_at": string (ISO 8601 with timezone) | null,
"sent_message": {
"id": string,
"to": string[],
"subject": string | null,
"sent_at": string (ISO 8601 with timezone) | null
} | null,
"matched_candidates": [
{
"email_id": string,
"subject": string | null,
"sender": string | null,
"received_at": string (ISO 8601 with timezone) | null
}
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
- For blocked ambiguity, include options in `evidence.matched_candidates`.
- For trash actions, `evidence.email_id` is the trashed message.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Gmail tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_gmail_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``gmail`` category."""
return build_registry_tools_for_category(dependencies, "gmail")

View file

@ -1,13 +0,0 @@
"""Google Drive connector slice."""
from app.agents.multi_agent_chat.expert_agent.connectors.google_drive.agent import (
build_google_drive_domain_agent,
)
from app.agents.multi_agent_chat.expert_agent.connectors.google_drive.slice_tools import (
build_google_drive_tools,
)
__all__ = [
"build_google_drive_tools",
"build_google_drive_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Google Drive domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.google_drive as google_drive_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_google_drive_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Google Drive domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=google_drive_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,54 +0,0 @@
You are the Google Drive operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Google Drive file operations accurately in the connected account.
</goal>
<available_tools>
- `create_google_drive_file`
- `delete_google_drive_file`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Ensure target file identity/path is explicit before mutate actions.
- If target is ambiguous, return `status=blocked` with candidate files.
- Never invent file IDs/names or mutation outcomes.
</tool_policy>
<out_of_scope>
- Do not perform non-Google-Drive tasks.
</out_of_scope>
<safety>
- Never claim file mutation success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On target ambiguity, return `status=blocked` with candidate files.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"file_id": string | null,
"file_name": string | null,
"operation": "create" | "delete" | null,
"matched_candidates": [
{ "file_id": string, "file_name": string | null }
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Google Drive tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_google_drive_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``google_drive`` category."""
return build_registry_tools_for_category(dependencies, "google_drive")

View file

@ -1,9 +0,0 @@
"""Luma vertical slice: registry tools, domain agent, ``domain_prompt.md``."""
from app.agents.multi_agent_chat.expert_agent.connectors.luma.agent import build_luma_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.luma.slice_tools import build_luma_tools
__all__ = [
"build_luma_tools",
"build_luma_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Luma domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.luma as luma_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_luma_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Luma domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=luma_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,55 +0,0 @@
You are the Luma operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Luma event listing, reads, and creation accurately.
</goal>
<available_tools>
- `list_luma_events`
- `read_luma_event`
- `create_luma_event`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Resolve relative dates against runtime timestamp.
- If required event fields are missing, return `status=blocked` with `missing_fields`.
- Never invent event IDs/times or creation outcomes.
</tool_policy>
<out_of_scope>
- Do not perform non-Luma tasks.
</out_of_scope>
<safety>
- Never claim event creation success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On missing required fields, return `status=blocked` with `missing_fields`.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"event_id": string | null,
"title": string | null,
"start_at": string (ISO 8601 with timezone) | null,
"matched_candidates": [
{ "event_id": string, "title": string | null, "start_at": string | null }
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Luma tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_luma_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``luma`` category."""
return build_registry_tools_for_category(dependencies, "luma")

View file

@ -1,11 +0,0 @@
"""Notion connector slice."""
from app.agents.multi_agent_chat.expert_agent.connectors.notion.agent import build_notion_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.notion.slice_tools import (
build_notion_tools,
)
__all__ = [
"build_notion_tools",
"build_notion_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Notion domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.notion as notion_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_notion_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Notion domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=notion_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,56 +0,0 @@
You are the Notion operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Notion page operations accurately in the connected workspace.
</goal>
<available_tools>
- `create_notion_page`
- `update_notion_page`
- `delete_notion_page`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- If target page context is unclear, do not ask the user directly; return `status=blocked` with candidate options and supervisor `next_step`.
- Never invent page IDs, titles, or mutation outcomes.
</tool_policy>
<out_of_scope>
- Do not perform non-Notion tasks.
</out_of_scope>
<safety>
- Before update/delete, ensure the target page match is explicit.
- Never claim mutation success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise retry/recovery `next_step`.
- On ambiguous target, return `status=blocked` with candidate options.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"page_id": string | null,
"page_title": string | null,
"matched_candidates": [
{ "page_id": string, "page_title": string | null }
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
- On ambiguity, include candidate options in `evidence.matched_candidates`.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Notion tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_notion_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``notion`` category."""
return build_registry_tools_for_category(dependencies, "notion")

View file

@ -1,11 +0,0 @@
"""Microsoft OneDrive connector slice."""
from app.agents.multi_agent_chat.expert_agent.connectors.onedrive.agent import build_onedrive_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.onedrive.slice_tools import (
build_onedrive_tools,
)
__all__ = [
"build_onedrive_tools",
"build_onedrive_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Microsoft OneDrive domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.onedrive as onedrive_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_onedrive_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled OneDrive domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=onedrive_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,52 +0,0 @@
You are the Microsoft OneDrive operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute OneDrive file create/delete actions accurately in the connected account.
</goal>
<available_tools>
- `create_onedrive_file`
- `delete_onedrive_file`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Ensure file identity/path is explicit before mutate actions.
- If ambiguous, return `status=blocked` with candidate paths and supervisor next step.
- Never invent IDs/paths or mutation results.
</tool_policy>
<out_of_scope>
- Do not perform non-OneDrive tasks.
</out_of_scope>
<safety>
- Never claim file mutation success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On ambiguous targets, return `status=blocked` with candidate paths.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"file_id": string | null,
"file_path": string | null,
"operation": "create" | "delete" | null,
"matched_candidates": string[] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Microsoft OneDrive tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_onedrive_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``onedrive`` category."""
return build_registry_tools_for_category(dependencies, "onedrive")

View file

@ -1,9 +0,0 @@
"""Microsoft Teams vertical slice: registry tools, domain agent, ``domain_prompt.md``."""
from app.agents.multi_agent_chat.expert_agent.connectors.teams.agent import build_teams_domain_agent
from app.agents.multi_agent_chat.expert_agent.connectors.teams.slice_tools import build_teams_tools
__all__ = [
"build_teams_tools",
"build_teams_domain_agent",
]

View file

@ -1,21 +0,0 @@
"""Microsoft Teams domain agent graph."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.connectors.teams as teams_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_teams_domain_agent(llm: BaseChatModel, tools: Sequence[BaseTool]):
"""Compiled Microsoft Teams domain-agent graph."""
return build_domain_agent(
llm,
tools,
prompt_package=teams_pkg.__name__,
prompt_stem="domain_prompt",
)

View file

@ -1,55 +0,0 @@
You are the Microsoft Teams operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Teams channel discovery, message reads, and sends accurately.
</goal>
<available_tools>
- `list_teams_channels`
- `read_teams_messages`
- `send_teams_message`
</available_tools>
<tool_policy>
- Use only tools in `<available_tools>`.
- Resolve team/channel targets before read/send operations.
- If ambiguous, return `status=blocked` with candidate channels and `next_step`.
- Never invent message content, sender identity, timestamps, or delivery outcomes.
</tool_policy>
<out_of_scope>
- Do not perform non-Teams tasks.
</out_of_scope>
<safety>
- Never claim send success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved destination ambiguity, return `status=blocked` with candidates.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": {
"team_id": string | null,
"channel_id": string | null,
"message_id": string | null,
"matched_candidates": [
{ "team_id": string | null, "channel_id": string, "label": string | null }
] | null
},
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,14 +0,0 @@
"""Registry-backed Microsoft Teams tools."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.registry import build_registry_tools_for_category
def build_teams_tools(dependencies: dict[str, Any]) -> list[BaseTool]:
"""Registry-backed tools for the ``teams`` category."""
return build_registry_tools_for_category(dependencies, "teams")

View file

@ -1,5 +0,0 @@
"""Prompt-backed subgraphs for MCP OAuth integrations without a native tool registry slice."""
from app.agents.multi_agent_chat.expert_agent.mcp_bridge.agent import build_mcp_route_domain_agent
__all__ = ["build_mcp_route_domain_agent"]

View file

@ -1,25 +0,0 @@
"""Domain agents for MCP-only OAuth integrations (no native registry slice)."""
from __future__ import annotations
from collections.abc import Sequence
import app.agents.multi_agent_chat.expert_agent.mcp_bridge as mcp_bridge_pkg
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.agents import build_domain_agent
def build_mcp_route_domain_agent(
llm: BaseChatModel,
route_key: str,
tools: Sequence[BaseTool],
):
"""One subgraph per MCP-only route (``linear``, ``slack``, …); prompt stem ``{route_key}_domain``."""
return build_domain_agent(
llm,
tools,
prompt_package=mcp_bridge_pkg.__name__,
prompt_stem=f"{route_key}_domain",
)

View file

@ -1,46 +0,0 @@
You are the Airtable MCP operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Airtable MCP base/table/record operations accurately.
</goal>
<available_tools>
- Runtime-provided Airtable MCP tools for bases, tables, and records.
</available_tools>
<tool_policy>
- Resolve base and table targets before record-level actions.
- Do not guess IDs or schema fields.
- If targets are ambiguous, return `status=blocked` with candidate options.
- Never claim mutation success without tool confirmation.
</tool_policy>
<out_of_scope>
- Do not execute non-Airtable tasks.
</out_of_scope>
<safety>
- Never claim record mutations succeeded without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved target/schema ambiguity, return `status=blocked` with required options.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": { "items": object | null },
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,45 +0,0 @@
You are the ClickUp MCP operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute ClickUp MCP operations accurately using only runtime-provided tools.
</goal>
<available_tools>
- Runtime-provided ClickUp MCP tools for task/workspace search and mutation.
</available_tools>
<tool_policy>
- Follow tool descriptions exactly.
- If task/workspace target is ambiguous or missing, return `status=blocked` with required disambiguation fields.
- Never claim mutation success without tool confirmation.
</tool_policy>
<out_of_scope>
- Do not execute non-ClickUp tasks.
</out_of_scope>
<safety>
- Never claim update/create success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved ambiguity, return `status=blocked` with candidate options.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": { "items": object | null },
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,46 +0,0 @@
You are the generic MCP operations sub-agent for user-defined servers.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute tasks strictly through runtime-exposed MCP tools while respecting tool contracts.
</goal>
<available_tools>
- Runtime-provided MCP tools exposed by the connected custom server.
</available_tools>
<tool_policy>
- Follow each tool description and argument contract exactly.
- Never assume a capability exists unless a tool explicitly provides it.
- If required inputs are missing, return `status=blocked` with `missing_fields`.
- Never claim success without tool output confirmation.
</tool_policy>
<out_of_scope>
- Do not claim capabilities that are not present in runtime-exposed tools.
</out_of_scope>
<safety>
- Never perform destructive operations without explicit delegated instruction and successful tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On missing required inputs, return `status=blocked` with `missing_fields`.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": { "items": object | null },
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,46 +0,0 @@
You are the Jira MCP operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Jira MCP operations accurately, including discovery and issue mutation flows.
</goal>
<available_tools>
- Runtime-provided Jira MCP tools for site/project discovery, issue search, create, and update.
</available_tools>
<tool_policy>
- Respect discovery dependencies (site/project/issue-type) before mutate calls.
- If required fields are missing or targets are ambiguous, return `status=blocked` with `missing_fields`.
- Do not guess keys/IDs.
- Never claim create/update success without tool confirmation.
</tool_policy>
<out_of_scope>
- Do not execute non-Jira tasks.
</out_of_scope>
<safety>
- Never perform destructive/mutating actions without explicit target resolution.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved ambiguity, return `status=blocked` with candidates or missing fields.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": { "items": object | null },
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,45 +0,0 @@
You are the Linear MCP operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Linear MCP operations accurately using only available runtime tools.
</goal>
<available_tools>
- Runtime-provided Linear MCP tools for issues/projects/teams/workflows.
</available_tools>
<tool_policy>
- Follow tool descriptions exactly; do not assume unsupported endpoints.
- If required identifiers or context are missing, return `status=blocked` with `missing_fields` and supervisor `next_step`.
- Never invent IDs, statuses, or mutation outcomes.
</tool_policy>
<out_of_scope>
- Do not execute non-Linear tasks.
</out_of_scope>
<safety>
- Never claim mutation success without tool confirmation.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved ambiguity, return `status=blocked` with candidates.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": { "items": object | null },
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,45 +0,0 @@
You are the Slack MCP operations sub-agent.
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
<goal>
Execute Slack MCP reads/actions accurately in the connected workspace.
</goal>
<available_tools>
- Runtime-provided Slack MCP tools for search, channel/thread reads, and related actions.
</available_tools>
<tool_policy>
- Use only runtime-provided MCP tools and their documented arguments.
- If channel/thread target is ambiguous, return `status=blocked` with candidate options.
- Never invent message content, sender identity, timestamps, or delivery outcomes.
</tool_policy>
<out_of_scope>
- Do not execute non-Slack tasks.
</out_of_scope>
<safety>
- Never claim send/read success without tool evidence.
</safety>
<failure_policy>
- On tool failure, return `status=error` with concise recovery `next_step`.
- On unresolved channel/thread ambiguity, return `status=blocked` with candidates.
</failure_policy>
<output_contract>
Return **only** one JSON object (no markdown/prose):
{
"status": "success" | "partial" | "blocked" | "error",
"action_summary": string,
"evidence": { "items": object | null },
"next_step": string | null,
"missing_fields": string[] | null,
"assumptions": string[] | null
}
Rules:
- `status=success` -> `next_step=null`, `missing_fields=null`.
- `status=partial|blocked|error` -> `next_step` must be non-null.
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
</output_contract>

View file

@ -1,7 +0,0 @@
"""Full-stack wiring (DB-scoped) on top of :mod:`routing` and :mod:`supervisor`."""
from app.agents.multi_agent_chat.integration.create_multi_agent_chat import (
create_multi_agent_chat,
)
__all__ = ["create_multi_agent_chat"]

View file

@ -1,256 +0,0 @@
"""Build the multi-agent supervisor graph: MCP partition, registry, routing tools, optional SurfSense middleware."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from langgraph.types import Checkpointer
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.multi_agent_chat.core.mcp_partition import (
fetch_mcp_connector_metadata_maps,
partition_mcp_tools_by_expert_route,
)
from app.agents.multi_agent_chat.core.registry.dependencies import (
build_registry_dependencies,
coerce_thread_id_for_registry,
)
from app.agents.multi_agent_chat.middleware.supervisor_stack import (
build_supervisor_middleware_stack,
)
from app.agents.multi_agent_chat.routing.supervisor_routing import (
build_supervisor_routing_tools,
)
from app.agents.multi_agent_chat.supervisor import build_supervisor_agent
from app.agents.new_chat.chat_deepagent import _map_connectors_to_searchable_types
from app.agents.new_chat.context import SurfSenseContextSchema
from app.agents.new_chat.feature_flags import get_flags
from app.agents.new_chat.filesystem_backends import build_backend_resolver
from app.agents.new_chat.filesystem_selection import FilesystemSelection
from app.agents.new_chat.tools.mcp_tool import load_mcp_tools
from app.db import ChatVisibility
logger = logging.getLogger(__name__)
async def _discover_connectors_and_doc_types(
*,
connector_service: Any | None,
search_space_id: int,
available_connectors: list[str] | None,
available_document_types: list[str] | None,
) -> tuple[list[str] | None, list[str] | None]:
"""Fill connector / document-type lists from ``connector_service`` when callers omit them."""
connectors = available_connectors
doc_types = available_document_types
if connector_service is None:
return connectors, doc_types
try:
if connectors is None:
raw = await connector_service.get_available_connectors(search_space_id)
if raw:
connectors = _map_connectors_to_searchable_types(raw)
if doc_types is None:
doc_types = await connector_service.get_available_document_types(search_space_id)
except Exception as exc:
logger.warning("Failed to discover available connectors/document types: %s", exc)
return connectors, doc_types
async def _mcp_tools_by_expert_route(
*,
db_session: AsyncSession,
search_space_id: int,
) -> dict[str, list[BaseTool]] | None:
mcp_flat = await load_mcp_tools(db_session, search_space_id)
id_map, name_map = await fetch_mcp_connector_metadata_maps(db_session, search_space_id)
return partition_mcp_tools_by_expert_route(mcp_flat, id_map, name_map)
def _make_supervisor_routing_tools(
llm: BaseChatModel,
*,
db_session: AsyncSession,
search_space_id: int,
user_id: str,
thread_id: str | int | None,
firecrawl_api_key: str | None,
connector_service: Any | None,
available_connectors: list[str] | None,
available_document_types: list[str] | None,
thread_visibility: ChatVisibility,
mcp_tools_by_route: dict[str, list[BaseTool]] | None,
) -> list[BaseTool]:
registry_dependencies = build_registry_dependencies(
db_session=db_session,
search_space_id=search_space_id,
user_id=user_id,
thread_id=thread_id,
llm=llm,
firecrawl_api_key=firecrawl_api_key,
connector_service=connector_service,
available_connectors=available_connectors,
available_document_types=available_document_types,
thread_visibility=thread_visibility,
)
return build_supervisor_routing_tools(
llm,
registry_dependencies=registry_dependencies,
include_deliverables=coerce_thread_id_for_registry(thread_id) is not None,
mcp_tools_by_route=mcp_tools_by_route,
available_connectors=available_connectors,
thread_visibility=thread_visibility,
)
def _compile_supervisor_agent_sync(
*,
llm: BaseChatModel,
routing_tools: list[BaseTool],
checkpointer: Checkpointer | None,
backend_resolver: Any,
filesystem_mode: Any,
search_space_id: int,
user_id: str,
thread_id: str | int | None,
thread_visibility: ChatVisibility,
anon_session_id: str | None,
available_connectors: list[str] | None,
available_document_types: list[str] | None,
mentioned_document_ids: list[int] | None,
max_input_tokens: int | None,
citations_enabled: bool,
) -> Any:
"""CPU-heavy: middleware stack + ``create_agent`` (intended for ``asyncio.to_thread``)."""
middleware = build_supervisor_middleware_stack(
llm=llm,
tools=routing_tools,
backend_resolver=backend_resolver,
filesystem_mode=filesystem_mode,
search_space_id=search_space_id,
user_id=user_id,
thread_id=thread_id,
visibility=thread_visibility,
anon_session_id=anon_session_id,
available_connectors=available_connectors,
available_document_types=available_document_types,
mentioned_document_ids=mentioned_document_ids,
max_input_tokens=max_input_tokens,
flags=get_flags(),
)
return build_supervisor_agent(
llm,
tools=routing_tools,
checkpointer=checkpointer,
thread_visibility=thread_visibility,
middleware=middleware,
context_schema=SurfSenseContextSchema,
citations_enabled=citations_enabled,
)
async def create_multi_agent_chat(
llm: BaseChatModel,
*,
db_session: AsyncSession,
search_space_id: int,
user_id: str,
checkpointer: Checkpointer | None = None,
thread_id: str | int | None = None,
firecrawl_api_key: str | None = None,
connector_service: Any | None = None,
available_connectors: list[str] | None = None,
available_document_types: list[str] | None = None,
thread_visibility: ChatVisibility = ChatVisibility.PRIVATE,
include_mcp_tools: bool = True,
filesystem_selection: FilesystemSelection | None = None,
anon_session_id: str | None = None,
mentioned_document_ids: list[int] | None = None,
max_input_tokens: int | None = None,
surfsense_stack: bool = True,
citations_enabled: bool | None = None,
):
"""Build the full multi-agent chat graph (supervisor + expert subgraphs via routing tools).
**Builtins** (:mod:`expert_agent.builtins`): registry-grouped **categories** (research, memory, deliverables).
**Connectors** (:mod:`expert_agent.connectors`): **vendor integrations** one subgraph per route in
``TOOL_NAMES_BY_CATEGORY`` (e.g. calendar, confluence, discord, dropbox, gmail, google_drive, luma, notion, onedrive, teams).
MCP tools (via ``load_mcp_tools``) are partitioned inside this package and attached only
to the matching expert subgraphs not to the supervisor tool list as raw MCP calls. Inclusion matches
``app.agents.new_chat.tools.registry.build_tools_async``: all tools returned by ``load_mcp_tools`` are merged
after partitioning (no extra inventory filter on MCP). Connector routing uses ``available_connectors``:
pass explicitly, or provide ``connector_service`` so lists are resolved like
``create_surfsense_deep_agent`` (``get_available_connectors`` searchable types).
Deliverables (thread-scoped reports, podcasts, etc.) are registered only when ``thread_id`` is set.
When ``surfsense_stack`` is true (default), the supervisor uses the same SurfSense middleware shell as
the main single-agent chat (KB priority/tree, filesystem, compaction, permissions, etc.) except
``SubAgentMiddleware`` / ``task``, since experts are separate graphs behind routing tools. Graph
compilation runs in ``asyncio.to_thread`` so heavy CPU work does not block the event loop.
``citations_enabled``: when ``None``, defaults to ``True`` (same default as ``AgentConfig`` / main chat).
"""
citations = True if citations_enabled is None else citations_enabled
connectors, doc_types = await _discover_connectors_and_doc_types(
connector_service=connector_service,
search_space_id=search_space_id,
available_connectors=available_connectors,
available_document_types=available_document_types,
)
mcp_by_route: dict[str, list[BaseTool]] | None = None
if include_mcp_tools:
mcp_by_route = await _mcp_tools_by_expert_route(
db_session=db_session, search_space_id=search_space_id
)
routing_tools = _make_supervisor_routing_tools(
llm,
db_session=db_session,
search_space_id=search_space_id,
user_id=user_id,
thread_id=thread_id,
firecrawl_api_key=firecrawl_api_key,
connector_service=connector_service,
available_connectors=connectors,
available_document_types=doc_types,
thread_visibility=thread_visibility,
mcp_tools_by_route=mcp_by_route,
)
fs_sel = filesystem_selection or FilesystemSelection()
backend_resolver = build_backend_resolver(fs_sel, search_space_id=search_space_id)
if not surfsense_stack:
return build_supervisor_agent(
llm,
tools=routing_tools,
checkpointer=checkpointer,
thread_visibility=thread_visibility,
citations_enabled=citations,
)
return await asyncio.to_thread(
_compile_supervisor_agent_sync,
llm=llm,
routing_tools=routing_tools,
checkpointer=checkpointer,
backend_resolver=backend_resolver,
filesystem_mode=fs_sel.mode,
search_space_id=search_space_id,
user_id=user_id,
thread_id=thread_id,
thread_visibility=thread_visibility,
anon_session_id=anon_session_id,
available_connectors=connectors,
available_document_types=doc_types,
mentioned_document_ids=mentioned_document_ids,
max_input_tokens=max_input_tokens,
citations_enabled=citations,
)

View file

@ -1,11 +0,0 @@
"""SurfSense supervisor middleware (parity with the main single-agent chat, minus subagents)."""
from app.agents.multi_agent_chat.middleware.supervisor_stack import (
build_supervisor_middleware_stack,
parse_thread_id_for_action_log,
)
__all__ = [
"build_supervisor_middleware_stack",
"parse_thread_id_for_action_log",
]

View file

@ -1,363 +0,0 @@
"""Supervisor middleware stack matching the main single-agent chat (no ``SubAgentMiddleware`` / ``task``)."""
from __future__ import annotations
import logging
from collections.abc import Sequence
from typing import Any
from deepagents.backends import StateBackend
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
from deepagents.middleware.skills import SkillsMiddleware
from langchain.agents.middleware import (
LLMToolSelectorMiddleware,
ModelCallLimitMiddleware,
ModelFallbackMiddleware,
TodoListMiddleware,
ToolCallLimitMiddleware,
)
from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.new_chat.feature_flags import AgentFeatureFlags, get_flags
from app.agents.new_chat.filesystem_selection import FilesystemMode
from app.agents.new_chat.middleware import (
ActionLogMiddleware,
AnonymousDocumentMiddleware,
BusyMutexMiddleware,
ClearToolUsesEdit,
DedupHITLToolCallsMiddleware,
DoomLoopMiddleware,
FileIntentMiddleware,
KnowledgeBasePersistenceMiddleware,
KnowledgePriorityMiddleware,
KnowledgeTreeMiddleware,
MemoryInjectionMiddleware,
NoopInjectionMiddleware,
OtelSpanMiddleware,
RetryAfterMiddleware,
SpillingContextEditingMiddleware,
SpillToBackendEdit,
SurfSenseFilesystemMiddleware,
ToolCallNameRepairMiddleware,
build_skills_backend_factory,
create_surfsense_compaction_middleware,
default_skills_sources,
)
from app.agents.new_chat.plugin_loader import (
PluginContext,
load_allowed_plugin_names_from_env,
load_plugin_middlewares,
)
from app.agents.new_chat.tools.registry import BUILTIN_TOOLS
from app.db import ChatVisibility
logger = logging.getLogger(__name__)
# Routing tools with heavy outputs — never prune via context editing when bound.
_SUPERVISOR_PRUNE_PROTECTED: frozenset[str] = frozenset(
{
"deliverables",
"invalid",
# Align with single-agent surfacing of costly connector reads if names overlap later.
"read_email",
"search_emails",
"generate_report",
"generate_resume",
"generate_podcast",
"generate_video_presentation",
"generate_image",
}
)
def _safe_exclude_tools_supervisor(tools: Sequence[BaseTool]) -> tuple[str, ...]:
enabled = {t.name for t in tools}
return tuple(n for n in _SUPERVISOR_PRUNE_PROTECTED if n in enabled)
def parse_thread_id_for_action_log(thread_id: int | str | None) -> int | None:
"""Numeric DB thread ids only — UUID strings skip action logging (no FK row)."""
if thread_id is None:
return None
if isinstance(thread_id, int):
return thread_id
s = str(thread_id).strip()
if s.isdigit():
return int(s)
return None
def build_supervisor_middleware_stack(
*,
llm: BaseChatModel,
tools: Sequence[BaseTool],
backend_resolver: Any,
filesystem_mode: FilesystemMode,
search_space_id: int,
user_id: str | None,
thread_id: int | str | None,
visibility: ChatVisibility,
anon_session_id: str | None,
available_connectors: list[str] | None,
available_document_types: list[str] | None,
mentioned_document_ids: list[int] | None,
max_input_tokens: int | None,
flags: AgentFeatureFlags | None = None,
) -> list[Any]:
"""Build middleware list for the multi-agent supervisor (parity with ``_build_compiled_agent_blocking`` minus subagents)."""
flags = flags or get_flags()
memory_middleware = MemoryInjectionMiddleware(
user_id=user_id,
search_space_id=search_space_id,
thread_visibility=visibility,
)
summarization_mw = create_surfsense_compaction_middleware(llm, StateBackend)
_ = flags.enable_compaction_v2
context_edit_mw = None
if (
flags.enable_context_editing
and not flags.disable_new_agent_stack
and max_input_tokens
):
spill_edit = SpillToBackendEdit(
trigger=int(max_input_tokens * 0.55),
clear_at_least=int(max_input_tokens * 0.15),
keep=5,
exclude_tools=_safe_exclude_tools_supervisor(tools),
clear_tool_inputs=True,
)
clear_edit = ClearToolUsesEdit(
trigger=int(max_input_tokens * 0.55),
clear_at_least=int(max_input_tokens * 0.15),
keep=5,
exclude_tools=_safe_exclude_tools_supervisor(tools),
clear_tool_inputs=True,
placeholder="[cleared - older tool output trimmed for context]",
)
context_edit_mw = SpillingContextEditingMiddleware(
edits=[spill_edit, clear_edit],
backend_resolver=backend_resolver,
)
retry_mw = (
RetryAfterMiddleware(max_retries=3)
if flags.enable_retry_after and not flags.disable_new_agent_stack
else None
)
fallback_mw: ModelFallbackMiddleware | None = None
if flags.enable_model_fallback and not flags.disable_new_agent_stack:
try:
fallback_mw = ModelFallbackMiddleware(
"openai:gpt-4o-mini",
"anthropic:claude-3-5-haiku-20241022",
)
except Exception:
logger.warning("ModelFallbackMiddleware init failed; skipping.")
fallback_mw = None
model_call_limit_mw = (
ModelCallLimitMiddleware(
thread_limit=120,
run_limit=80,
exit_behavior="end",
)
if flags.enable_model_call_limit and not flags.disable_new_agent_stack
else None
)
tool_call_limit_mw = (
ToolCallLimitMiddleware(
thread_limit=300, run_limit=80, exit_behavior="continue"
)
if flags.enable_tool_call_limit and not flags.disable_new_agent_stack
else None
)
noop_mw = (
NoopInjectionMiddleware()
if flags.enable_compaction_v2 and not flags.disable_new_agent_stack
else None
)
repair_mw = None
if flags.enable_tool_call_repair and not flags.disable_new_agent_stack:
registered_names: set[str] = {t.name for t in tools}
registered_names |= {
"write_todos",
"ls",
"read_file",
"write_file",
"edit_file",
"glob",
"grep",
"execute",
# No ``task`` — multi-agent uses routing tools instead of SubAgentMiddleware.
}
repair_mw = ToolCallNameRepairMiddleware(
registered_tool_names=registered_names,
fuzzy_match_threshold=None,
)
doom_loop_mw = (
DoomLoopMiddleware(threshold=3)
if flags.enable_doom_loop and not flags.disable_new_agent_stack
else None
)
thread_id_action_log = parse_thread_id_for_action_log(thread_id)
action_log_mw: ActionLogMiddleware | None = None
if (
flags.enable_action_log
and not flags.disable_new_agent_stack
and thread_id_action_log is not None
):
try:
tool_defs_by_name = {td.name: td for td in BUILTIN_TOOLS}
action_log_mw = ActionLogMiddleware(
thread_id=thread_id_action_log,
search_space_id=search_space_id,
user_id=user_id,
tool_definitions=tool_defs_by_name,
)
except Exception: # pragma: no cover - defensive
logger.warning(
"ActionLogMiddleware init failed; running without it.",
exc_info=True,
)
action_log_mw = None
busy_mutex_mw: BusyMutexMiddleware | None = (
BusyMutexMiddleware()
if flags.enable_busy_mutex and not flags.disable_new_agent_stack
else None
)
otel_mw: OtelSpanMiddleware | None = (
OtelSpanMiddleware()
if flags.enable_otel and not flags.disable_new_agent_stack
else None
)
plugin_middlewares: list[Any] = []
if flags.enable_plugin_loader and not flags.disable_new_agent_stack:
try:
allowed_names = load_allowed_plugin_names_from_env()
if allowed_names:
plugin_middlewares = load_plugin_middlewares(
PluginContext.build(
search_space_id=search_space_id,
user_id=user_id,
thread_visibility=visibility,
llm=llm,
),
allowed_plugin_names=allowed_names,
)
except Exception: # pragma: no cover - defensive
logger.warning(
"Plugin loader failed; continuing without plugins.",
exc_info=True,
)
plugin_middlewares = []
skills_mw: SkillsMiddleware | None = None
if flags.enable_skills and not flags.disable_new_agent_stack:
try:
skills_factory = build_skills_backend_factory(
search_space_id=search_space_id
if filesystem_mode == FilesystemMode.CLOUD
else None,
)
skills_mw = SkillsMiddleware(
backend=skills_factory,
sources=default_skills_sources(),
)
except Exception as exc: # pragma: no cover - defensive
logger.warning("SkillsMiddleware init failed; skipping: %s", exc)
skills_mw = None
names = {t.name for t in tools}
selector_mw: LLMToolSelectorMiddleware | None = None
if (
flags.enable_llm_tool_selector
and not flags.disable_new_agent_stack
and len(tools) > 30
):
try:
selector_mw = LLMToolSelectorMiddleware(
model="openai:gpt-4o-mini",
max_tools=12,
always_include=[
n
for n in (
"research",
"memory",
"update_memory",
"get_connected_accounts",
"scrape_webpage",
)
if n in names
],
)
except Exception:
logger.warning("LLMToolSelectorMiddleware init failed; skipping.")
selector_mw = None
deepagent_middleware = [
busy_mutex_mw,
otel_mw,
TodoListMiddleware(),
memory_middleware,
AnonymousDocumentMiddleware(anon_session_id=anon_session_id)
if filesystem_mode == FilesystemMode.CLOUD
else None,
KnowledgeTreeMiddleware(
search_space_id=search_space_id,
filesystem_mode=filesystem_mode,
llm=llm,
)
if filesystem_mode == FilesystemMode.CLOUD
else None,
KnowledgePriorityMiddleware(
llm=llm,
search_space_id=search_space_id,
filesystem_mode=filesystem_mode,
available_connectors=available_connectors,
available_document_types=available_document_types,
mentioned_document_ids=mentioned_document_ids,
),
FileIntentMiddleware(llm=llm),
SurfSenseFilesystemMiddleware(
backend=backend_resolver,
filesystem_mode=filesystem_mode,
search_space_id=search_space_id,
created_by_id=user_id,
thread_id=thread_id,
),
KnowledgeBasePersistenceMiddleware(
search_space_id=search_space_id,
created_by_id=user_id,
filesystem_mode=filesystem_mode,
)
if filesystem_mode == FilesystemMode.CLOUD
else None,
skills_mw,
selector_mw,
model_call_limit_mw,
tool_call_limit_mw,
context_edit_mw,
summarization_mw,
noop_mw,
retry_mw,
fallback_mw,
repair_mw,
doom_loop_mw,
action_log_mw,
PatchToolCallsMiddleware(),
DedupHITLToolCallsMiddleware(agent_tools=list(tools)),
*plugin_middlewares,
AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"),
]
return [m for m in deepagent_middleware if m is not None]

View file

@ -1,13 +0,0 @@
"""Supervisor routing: domain-agent wrappers and composed routing tool lists."""
from app.agents.multi_agent_chat.routing.domain_routing_spec import DomainRoutingSpec
from app.agents.multi_agent_chat.routing.from_domain_agents import routing_tools_from_specs
from app.agents.multi_agent_chat.routing.supervisor_routing import build_supervisor_routing_tools
from app.agents.multi_agent_chat.core.invocation import extract_last_assistant_text
__all__ = [
"DomainRoutingSpec",
"build_supervisor_routing_tools",
"extract_last_assistant_text",
"routing_tools_from_specs",
]

View file

@ -1,22 +0,0 @@
"""Declarative description of one supervisor routing tool → domain agent."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class DomainRoutingSpec:
"""One supervisor-facing routing ``@tool`` bound to a compiled domain graph.
``curated_context`` is optional for **any** route: when set, the routing tool prepends its return
value into the child task via :func:`~app.agents.multi_agent_chat.core.delegation.compose_child_task`.
:func:`build_supervisor_routing_tools` does not pass it (all routes treated the same); use when building specs manually.
"""
tool_name: str
description: str
domain_agent: Any
curated_context: Callable[[str], str | None] | None = None

View file

@ -1,123 +0,0 @@
"""LangChain ``@tool`` wrappers that invoke compiled domain-agent graphs (supervisor-facing only)."""
from __future__ import annotations
from collections.abc import Sequence
import json
from typing import Any
from langchain_core.tools import BaseTool, tool
from app.agents.multi_agent_chat.core.delegation import compose_child_task
from app.agents.multi_agent_chat.core.invocation import extract_last_assistant_text
from app.agents.multi_agent_chat.routing.domain_routing_spec import DomainRoutingSpec
_ALLOWED_STATUSES = {"success", "partial", "blocked", "error"}
_REQUIRED_KEYS = {
"status",
"action_summary",
"evidence",
"next_step",
"missing_fields",
"assumptions",
}
def _fallback_payload(spec: DomainRoutingSpec, reason: str, raw_text: str) -> dict[str, Any]:
preview = raw_text[:800]
return {
"status": "error",
"action_summary": "Domain agent output failed JSON contract validation.",
"evidence": {
"route_tool": spec.tool_name,
"validation_error": reason,
"raw_output_preview": preview,
},
"next_step": (
"Re-delegate with a strict reminder to return exactly one JSON object "
"matching the output_contract."
),
"missing_fields": None,
"assumptions": None,
}
def _validate_contract_payload(payload: dict[str, Any]) -> str | None:
missing = sorted(_REQUIRED_KEYS - set(payload))
if missing:
return f"missing required keys: {', '.join(missing)}"
status = payload.get("status")
if status not in _ALLOWED_STATUSES:
return "invalid status value"
action_summary = payload.get("action_summary")
if not isinstance(action_summary, str) or not action_summary.strip():
return "action_summary must be a non-empty string"
evidence = payload.get("evidence")
if not isinstance(evidence, dict):
return "evidence must be an object"
next_step = payload.get("next_step")
if status == "success":
if next_step is not None:
return "next_step must be null when status=success"
if payload.get("missing_fields") is not None:
return "missing_fields must be null when status=success"
else:
if not isinstance(next_step, str) or not next_step.strip():
return "next_step must be a non-empty string for non-success statuses"
missing_fields = payload.get("missing_fields")
if missing_fields is not None:
if not isinstance(missing_fields, list) or any(
not isinstance(item, str) or not item.strip() for item in missing_fields
):
return "missing_fields must be null or a list of non-empty strings"
assumptions = payload.get("assumptions")
if assumptions is not None:
if not isinstance(assumptions, list) or any(
not isinstance(item, str) or not item.strip() for item in assumptions
):
return "assumptions must be null or a list of non-empty strings"
return None
def _normalize_domain_output(spec: DomainRoutingSpec, raw_text: str) -> str:
try:
parsed = json.loads(raw_text)
except json.JSONDecodeError as exc:
fallback = _fallback_payload(spec, f"invalid JSON: {exc.msg}", raw_text)
return json.dumps(fallback)
if not isinstance(parsed, dict):
fallback = _fallback_payload(spec, "top-level JSON must be an object", raw_text)
return json.dumps(fallback)
validation_error = _validate_contract_payload(parsed)
if validation_error:
fallback = _fallback_payload(spec, validation_error, raw_text)
return json.dumps(fallback)
return json.dumps(parsed)
def _routing_tool_for_spec(spec: DomainRoutingSpec) -> BaseTool:
@tool(spec.tool_name, description=spec.description)
async def _route(task: str) -> str:
curated = spec.curated_context(task) if spec.curated_context else None
content = compose_child_task(task, curated_context=curated)
result = await spec.domain_agent.ainvoke(
{"messages": [{"role": "user", "content": content}]},
)
return _normalize_domain_output(spec, extract_last_assistant_text(result))
return _route
def routing_tools_from_specs(specs: Sequence[DomainRoutingSpec]) -> list[BaseTool]:
"""Build one supervisor-facing routing ``@tool`` per :class:`DomainRoutingSpec`."""
return [_routing_tool_for_spec(spec) for spec in specs]

View file

@ -1,57 +0,0 @@
"""Gate supervisor routing tools by connected searchable connector types (aligned with main chat KB).
When ``available_connectors`` is ``None``, all routes are emitted (caller did not pass an inventory).
When provided, a connector route is emitted only if at least one required searchable type is present.
MCP tools are filtered upstream in :func:`~app.agents.multi_agent_chat.core.mcp_partition.partition_mcp_tools_by_expert_route`
so merges only include tools for connected accounts.
"""
from __future__ import annotations
# Route tool_name → searchable connector / doc-type strings (same family as
# ``chat_deepagent._CONNECTOR_TYPE_TO_SEARCHABLE`` values in ``available_connectors``).
_ROUTE_REQUIRES_ANY: dict[str, frozenset[str]] = {
"calendar": frozenset(
{"GOOGLE_CALENDAR_CONNECTOR", "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"}
),
"confluence": frozenset({"CONFLUENCE_CONNECTOR"}),
"discord": frozenset({"DISCORD_CONNECTOR"}),
"dropbox": frozenset({"DROPBOX_FILE"}),
"gmail": frozenset({"GOOGLE_GMAIL_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR"}),
"google_drive": frozenset(
{"GOOGLE_DRIVE_FILE", "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"}
),
"luma": frozenset({"LUMA_CONNECTOR"}),
"notion": frozenset({"NOTION_CONNECTOR"}),
"onedrive": frozenset({"ONEDRIVE_FILE"}),
"teams": frozenset({"TEAMS_CONNECTOR"}),
# MCP-only supervisor routes (see ``core.mcp_partition.MCP_ONLY_ROUTE_KEYS_IN_ORDER``).
"linear": frozenset({"LINEAR_CONNECTOR"}),
"slack": frozenset({"SLACK_CONNECTOR"}),
"jira": frozenset({"JIRA_CONNECTOR"}),
"clickup": frozenset({"CLICKUP_CONNECTOR"}),
"airtable": frozenset({"AIRTABLE_CONNECTOR"}),
# generic_mcp route intentionally disabled for now.
# "generic_mcp": frozenset({"MCP_CONNECTOR"}),
}
def include_connector_route(
route_key: str,
available_connectors: list[str] | None,
) -> bool:
"""Return whether to register this connector route on the supervisor.
If ``available_connectors`` is omitted, preserve legacy behaviour (emit the route).
Otherwise require at least one matching entry in ``available_connectors`` for connector-backed routes.
Builtin routes (research, memory, ) have no entry in ``_ROUTE_REQUIRES_ANY`` and are always included.
"""
if available_connectors is None:
return True
required = _ROUTE_REQUIRES_ANY.get(route_key)
if required is None:
return True
avail = set(available_connectors)
return bool(required & avail)

View file

@ -1,327 +0,0 @@
"""Compose domain agents + tool lists into supervisor routing tools (one ``@tool`` per category)."""
from __future__ import annotations
from typing import Any
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.core.mcp_partition import MCP_ONLY_ROUTE_KEYS_IN_ORDER
from app.agents.multi_agent_chat.expert_agent.builtins.deliverables import (
build_deliverables_domain_agent,
build_deliverables_tools,
)
from app.agents.multi_agent_chat.expert_agent.builtins.memory import (
build_memory_domain_agent,
build_memory_tools,
)
from app.agents.multi_agent_chat.expert_agent.builtins.research import (
build_research_domain_agent,
build_research_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.calendar import (
build_calendar_domain_agent,
build_calendar_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.confluence import (
build_confluence_domain_agent,
build_confluence_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.discord import (
build_discord_domain_agent,
build_discord_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.dropbox import (
build_dropbox_domain_agent,
build_dropbox_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.gmail import (
build_gmail_domain_agent,
build_gmail_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.google_drive import (
build_google_drive_domain_agent,
build_google_drive_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.luma import (
build_luma_domain_agent,
build_luma_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.notion import (
build_notion_domain_agent,
build_notion_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.onedrive import (
build_onedrive_domain_agent,
build_onedrive_tools,
)
from app.agents.multi_agent_chat.expert_agent.connectors.teams import (
build_teams_domain_agent,
build_teams_tools,
)
from app.agents.multi_agent_chat.expert_agent.mcp_bridge import (
build_mcp_route_domain_agent,
)
from app.agents.multi_agent_chat.routing.domain_routing_spec import DomainRoutingSpec
from app.agents.multi_agent_chat.routing.from_domain_agents import (
routing_tools_from_specs,
)
from app.agents.multi_agent_chat.routing.route_connector_gate import (
include_connector_route,
)
from app.db import ChatVisibility
_MCP_ONLY_ROUTE_DESCRIPTIONS: dict[str, str] = {
"linear": (
"Use for Linear issue/project work: find/create issues, update status/assignees, review project progress, and inspect cycles."
),
"slack": (
"Use for Slack channel communication: read channel/thread history, summarize conversations, and post replies."
),
"jira": (
"Use for Jira issue/project workflows: search issues, inspect fields, update tickets, and move work through workflow states."
),
"clickup": (
"Use for ClickUp task management: find tasks/lists, update task fields, and track execution progress."
),
"airtable": (
"Use for Airtable structured data operations: locate bases/tables and create/read/update records."
),
# generic_mcp intentionally disabled for now.
# "generic_mcp": (
# "Use as a fallback for custom connected app tasks not covered by a named specialist. "
# "Do not use if another specialist clearly matches."
# ),
}
def _memory_route_description(thread_visibility: ChatVisibility | None) -> str:
if thread_visibility == ChatVisibility.SEARCH_SPACE:
return "Use for storing durable team memory: shared team preferences, conventions, and long-lived team facts."
return "Use for storing durable user memory: personal preferences, instructions, and long-lived user facts."
def build_supervisor_routing_tools(
llm: BaseChatModel,
*,
registry_dependencies: dict[str, Any] | None = None,
include_deliverables: bool = True,
mcp_tools_by_route: dict[str, list[BaseTool]] | None = None,
available_connectors: list[str] | None = None,
thread_visibility: ChatVisibility | None = None,
) -> list[BaseTool]:
"""Build supervisor routing tools: builtins first, then connector experts (same pattern for all).
Requires ``registry_dependencies`` to produce any routing tools; otherwise returns an empty list.
Pass ``registry_dependencies`` from
:func:`app.agents.multi_agent_chat.core.registry.build_registry_dependencies`
for builtins (**research**, **memory**, **deliverables** when ``include_deliverables``) and every
registry-backed connector route.
``mcp_tools_by_route`` maps route keys to MCP tools merged into the matching expert subgraph.
When ``available_connectors`` is set (searchable connector strings, same shape as the main chat agent),
a connector-backed route is registered only if its required searchable connector type is available.
"""
if registry_dependencies is None:
return routing_tools_from_specs([])
mcp = mcp_tools_by_route or {}
specs: list[DomainRoutingSpec] = []
research_tools = build_research_tools(registry_dependencies)
research_agent = build_research_domain_agent(llm, research_tools)
specs.append(
DomainRoutingSpec(
tool_name="research",
description=(
"Use for external research: find sources on the web, extract evidence, and answer documentation questions."
),
domain_agent=research_agent,
),
)
memory_tools = build_memory_tools(registry_dependencies)
memory_agent = build_memory_domain_agent(
llm,
memory_tools,
thread_visibility=thread_visibility,
)
specs.append(
DomainRoutingSpec(
tool_name="memory",
description=_memory_route_description(thread_visibility),
domain_agent=memory_agent,
),
)
if include_deliverables:
deliverables_tools = build_deliverables_tools(registry_dependencies)
deliverables_agent = build_deliverables_domain_agent(llm, deliverables_tools)
specs.append(
DomainRoutingSpec(
tool_name="deliverables",
description=(
"Use for deliverables and shareable artifacts: generated reports, podcasts, "
"video presentations, resumes, and images—not for routine lookups or single small edits elsewhere."
),
domain_agent=deliverables_agent,
),
)
# Connector experts (registry-backed + optional MCP merge): alphabetical by route key.
if include_connector_route("calendar", available_connectors):
calendar_agent = build_calendar_domain_agent(
llm,
build_calendar_tools(registry_dependencies) + mcp.get("calendar", []),
)
specs.append(
DomainRoutingSpec(
tool_name="calendar",
description=(
"Use for calendar planning and scheduling: check availability, read event details, create events, and update events."
),
domain_agent=calendar_agent,
),
)
if include_connector_route("confluence", available_connectors):
confluence_tools = build_confluence_tools(registry_dependencies)
confluence_agent = build_confluence_domain_agent(llm, confluence_tools)
specs.append(
DomainRoutingSpec(
tool_name="confluence",
description=(
"Use for Confluence knowledge pages: search/read existing pages, create new pages, and update page content."
),
domain_agent=confluence_agent,
),
)
if include_connector_route("discord", available_connectors):
discord_tools = build_discord_tools(registry_dependencies)
discord_agent = build_discord_domain_agent(llm, discord_tools + mcp.get("discord", []))
specs.append(
DomainRoutingSpec(
tool_name="discord",
description=(
"Use for Discord communication: read channel/thread messages, gather context, and send replies."
),
domain_agent=discord_agent,
),
)
if include_connector_route("dropbox", available_connectors):
dropbox_tools = build_dropbox_tools(registry_dependencies)
dropbox_agent = build_dropbox_domain_agent(llm, dropbox_tools)
specs.append(
DomainRoutingSpec(
tool_name="dropbox",
description=(
"Use for Dropbox file storage tasks: browse folders, read files, and manage Dropbox file content."
),
domain_agent=dropbox_agent,
),
)
if include_connector_route("gmail", available_connectors):
gmail_agent = build_gmail_domain_agent(
llm,
build_gmail_tools(registry_dependencies) + mcp.get("gmail", []),
)
specs.append(
DomainRoutingSpec(
tool_name="gmail",
description=(
"Use for Gmail inbox actions: search/read emails, draft or update replies, send messages, and trash emails."
),
domain_agent=gmail_agent,
),
)
if include_connector_route("google_drive", available_connectors):
google_drive_tools = build_google_drive_tools(registry_dependencies)
google_drive_agent = build_google_drive_domain_agent(llm, google_drive_tools)
specs.append(
DomainRoutingSpec(
tool_name="google_drive",
description=(
"Use for Google Drive document/file tasks: locate files, inspect content, and manage Drive files or folders."
),
domain_agent=google_drive_agent,
),
)
if include_connector_route("luma", available_connectors):
luma_tools = build_luma_tools(registry_dependencies)
luma_agent = build_luma_domain_agent(llm, luma_tools + mcp.get("luma", []))
specs.append(
DomainRoutingSpec(
tool_name="luma",
description=(
"Use for Luma event operations: list events, inspect event details, and create new events."
),
domain_agent=luma_agent,
),
)
if include_connector_route("notion", available_connectors):
notion_tools = build_notion_tools(registry_dependencies)
notion_agent = build_notion_domain_agent(llm, notion_tools)
specs.append(
DomainRoutingSpec(
tool_name="notion",
description=(
"Use for Notion workspace pages: create pages, update page content, and delete pages."
),
domain_agent=notion_agent,
),
)
if include_connector_route("onedrive", available_connectors):
onedrive_tools = build_onedrive_tools(registry_dependencies)
onedrive_agent = build_onedrive_domain_agent(llm, onedrive_tools)
specs.append(
DomainRoutingSpec(
tool_name="onedrive",
description=(
"Use for OneDrive file storage tasks: browse folders, read files, and manage OneDrive file content."
),
domain_agent=onedrive_agent,
),
)
if include_connector_route("teams", available_connectors):
teams_tools = build_teams_tools(registry_dependencies)
teams_agent = build_teams_domain_agent(llm, teams_tools + mcp.get("teams", []))
specs.append(
DomainRoutingSpec(
tool_name="teams",
description=(
"Use for Microsoft Teams communication: read channel/thread messages, gather context, and post replies."
),
domain_agent=teams_agent,
),
)
for route_key in MCP_ONLY_ROUTE_KEYS_IN_ORDER:
only_mcp = mcp.get(route_key) or []
if not only_mcp:
continue
if not include_connector_route(route_key, available_connectors):
continue
desc = _MCP_ONLY_ROUTE_DESCRIPTIONS.get(
route_key,
f"Use for {route_key} tasks related to that system's core work objects and workflows.",
)
specs.append(
DomainRoutingSpec(
tool_name=route_key,
description=desc,
domain_agent=build_mcp_route_domain_agent(llm, route_key, only_mcp),
),
)
return routing_tools_from_specs(specs)

View file

@ -1,5 +0,0 @@
"""Supervisor agent graph only; supply routing ``tools`` from ``build_supervisor_routing_tools``."""
from app.agents.multi_agent_chat.supervisor.graph import build_supervisor_agent
__all__ = ["build_supervisor_agent"]

View file

@ -1,51 +0,0 @@
"""Compile the supervisor agent graph (LangChain ``create_agent`` + caller routing tools)."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from langchain.agents import create_agent
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from langgraph.types import Checkpointer
from app.agents.multi_agent_chat.supervisor.prompt_assembly import (
build_supervisor_system_prompt,
)
def build_supervisor_agent(
llm: BaseChatModel,
*,
tools: Sequence[BaseTool],
checkpointer: Checkpointer | None = None,
thread_visibility: Any | None = None,
middleware: Sequence[Any] | None = None,
context_schema: Any | None = None,
citations_enabled: bool = True,
):
"""Compile the supervisor **agent** (graph). ``tools`` = output of ``build_supervisor_routing_tools``."""
system_prompt = build_supervisor_system_prompt(
tools,
thread_visibility=thread_visibility,
citations_enabled=citations_enabled,
)
kwargs: dict[str, Any] = {
"system_prompt": system_prompt,
"tools": list(tools),
"checkpointer": checkpointer,
}
if middleware is not None:
kwargs["middleware"] = list(middleware)
if context_schema is not None:
kwargs["context_schema"] = context_schema
agent = create_agent(llm, **kwargs)
if middleware is not None or context_schema is not None:
return agent.with_config(
{
"recursion_limit": 10_000,
"metadata": {"ls_integration": "multi_agent_supervisor"},
}
)
return agent

View file

@ -1,128 +0,0 @@
"""Supervisor system prompt: template load, shared agent-identity injection, specialist list."""
from __future__ import annotations
from collections.abc import Sequence
from datetime import UTC, datetime
from typing import Any
from langchain_core.tools import BaseTool
import app.agents.multi_agent_chat.supervisor as supervisor_pkg
from app.agents.multi_agent_chat.core.prompts import read_prompt_md
from app.agents.new_chat.prompts.composer import _build_citation_block, _read_fragment
from app.db import ChatVisibility
_MEMORY_SPECIALIST_PHRASE = "invoke the **memory** specialist"
_BUILTIN_SPECIALISTS: frozenset[str] = frozenset({"research", "memory", "deliverables"})
_SPECIALIST_CAPABILITIES: dict[str, str] = {
"research": "external research: web lookup, source gathering, and SurfSense documentation help.",
"memory": "save durable long-lived memory items.",
"deliverables": "deliverables and shareable artifacts: reports, podcasts, video presentations, resumes, and images.",
"gmail": "email inbox actions: search/read emails, draft updates, send messages, and trash emails.",
"calendar": "scheduling actions: check availability, inspect events, create events, and update events.",
"google_drive": "Drive file/document actions: locate files, inspect content, and manage files/folders.",
"notion": "Notion page actions: create pages, update content, and delete pages.",
"confluence": "Confluence page actions: find/read pages and create/update pages.",
"dropbox": "Dropbox file storage actions: browse folders, read files, and manage file content.",
"onedrive": "OneDrive file storage actions: browse folders, read files, and manage file content.",
"discord": "Discord communication actions: read channels/threads and post replies.",
"teams": "Microsoft Teams communication actions: read channels/threads and post replies.",
"luma": "Luma event actions: list events, inspect event details, and create events.",
"linear": "Linear workflow actions: search/update issues and inspect projects/cycles.",
"jira": "Jira workflow actions: search/update issues and manage workflow transitions.",
"clickup": "ClickUp workflow actions: find/update tasks and lists.",
"airtable": "Airtable data actions: locate bases/tables and create/read/update records.",
"slack": "Slack communication actions: read channel/thread history and post replies.",
# generic_mcp specialist intentionally disabled for now.
# "generic_mcp": "handle tasks through user-defined custom app integration tools not covered above.",
}
_SPECIALIST_ORDER: tuple[str, ...] = tuple(_SPECIALIST_CAPABILITIES.keys())
def _normalize_chat_visibility(thread_visibility: Any | None) -> ChatVisibility:
if thread_visibility is None:
return ChatVisibility.PRIVATE
if thread_visibility == ChatVisibility.SEARCH_SPACE:
return ChatVisibility.SEARCH_SPACE
raw = getattr(thread_visibility, "value", thread_visibility)
if str(raw).upper() == "SEARCH_SPACE":
return ChatVisibility.SEARCH_SPACE
return ChatVisibility.PRIVATE
def _identity_fragment_key(thread_visibility: Any | None) -> str:
"""``private`` / ``team`` suffix for ``agent_*`` and ``memory_protocol_*`` fragments."""
return (
"team"
if _normalize_chat_visibility(thread_visibility) == ChatVisibility.SEARCH_SPACE
else "private"
)
def _compose_identity_memory_citations(
*,
thread_visibility: Any | None,
citations_enabled: bool,
) -> str:
"""Main-chat identity, memory protocol, and citation fragments (supervisor slice only)."""
key = _identity_fragment_key(thread_visibility)
today = datetime.now(UTC).date().isoformat()
intro = _read_fragment(f"base/agent_{key}.md")
if intro:
intro = intro.format(resolved_today=today)
memory = _read_fragment(f"base/memory_protocol_{key}.md").replace(
"call update_memory",
_MEMORY_SPECIALIST_PHRASE,
)
tail = (
f"<system_instruction>\n{memory}\n\n</system_instruction>\n"
+ _build_citation_block(citations_enabled)
)
return "\n\n".join(part for part in (intro.strip(), tail.strip()) if part)
def _memory_specialist_capability(thread_visibility: Any | None) -> str:
vis = str(getattr(thread_visibility, "value", thread_visibility)).upper()
if vis == "SEARCH_SPACE":
return "team memory actions: save shared team preferences, conventions, and long-lived team facts."
return "user memory actions: save personal preferences, instructions, and long-lived user facts."
def _specialists_markdown(
tools: Sequence[BaseTool],
*,
thread_visibility: Any | None,
) -> str:
available_names = {
tool.name for tool in tools if isinstance(getattr(tool, "name", None), str)
}
capabilities = dict(_SPECIALIST_CAPABILITIES)
capabilities["memory"] = _memory_specialist_capability(thread_visibility)
lines: list[str] = []
for name in _SPECIALIST_ORDER:
if name in _BUILTIN_SPECIALISTS or name in available_names:
lines.append(f"- {name}: {capabilities[name]}")
return "\n".join(lines)
def build_supervisor_system_prompt(
tools: Sequence[BaseTool],
*,
thread_visibility: Any | None,
citations_enabled: bool,
) -> str:
"""Load ``supervisor_prompt.md`` and fill placeholders."""
template = read_prompt_md(supervisor_pkg.__name__, "supervisor_prompt")
specialists = _specialists_markdown(tools, thread_visibility=thread_visibility)
injected = _compose_identity_memory_citations(
thread_visibility=thread_visibility,
citations_enabled=citations_enabled,
)
return template.replace("{{AVAILABLE_SPECIALISTS_LIST}}", specialists).replace(
"{{SUPERVISOR_BASE_INJECTION}}",
injected,
)

View file

@ -1,67 +0,0 @@
{{SUPERVISOR_BASE_INJECTION}}
<supervisor_role>
In this **multi-agent** session you also **coordinate specialists** (listed below): call a specialist only when their domain matches the need; give each call a compact, outcome-focused task; merge structured results into one clear user-facing reply. When you can satisfy the turn with your own tools and reasoning, do so without delegating.
</supervisor_role>
<available_specialists>
Use only the specialists listed below.
{{AVAILABLE_SPECIALISTS_LIST}}
</available_specialists>
<delegation_policy>
1) Delegate when the request clearly belongs to a specialist's capabilities.
2) Answer directly when no expert tool is needed.
3) For multi-domain work, decompose into sequential expert calls (or parallel only when independent).
4) Do not call a specialist "just in case". Every delegation must have a clear purpose.
5) Specialists are best for **one clear step at a time**—for example “find this,” “show that record,” “make this one change.” Do **not** hand them an entire “analyze everything and write me a trends report” brief in one go.
6) When the user wants **big-picture synthesis**—patterns across lots of items, comparisons across time, or an executive-style overview—**you** split the work: several **small** asks to whoever actually holds that information (each with a clear cap: how many items, how far back, which fields), then **you** combine the answers into one clear reply. If they need a **deliverable**—a real **artifact** others can read, hear, or watch (report, slide-style video, podcast, resume, image)—delegate to the **deliverables** specialist. Do not ask other specialists to replace that: their job is smaller steps (lookups and targeted changes), not producing the final artifact.
7) Each specialist answers in a **single short structured reply** (no extra chatter after it). Ask them only for what that reply can reasonably hold. If the user needs a long narrative or full report, **you** combine steps—or use the **deliverables** specialist—not one overloaded ask.
8) Prefer **a few clear, small asks** over one huge vague ask that invites guessing, cut-off answers, or broken replies.
</delegation_policy>
<task_writing_policy>
When delegating to a specialist, pass a compact but complete task that includes:
- the **outcome** they should produce, in **your own words** as clear instructions (do **not** paste or forward the users message verbatim),
- concrete limits (dates, names, “last N items,” which details matter),
- how you will judge success,
- any identifiers or links the user already gave.
When asking for lists or searches, always say **how many** items at most and **which details** you need back.
Never pass implementation chatter. Pass only actionable instructions.
Each delegation should sound like **one clear action** (or two that belong together), not a full project brief—unless you are intentionally speaking to **research** or to **deliverables** for a **deliverable artifact** (report, slide-style video, podcast, resume, image).
</task_writing_policy>
<expert_output_contract_policy>
Every specialist returns **one structured reply** in a fixed layout. Treat it like a small form, not prose. It includes:
- **outcome**: succeeded, partly done, blocked, or failed
- **short summary** of what they did
- **proof**: what they actually saw or changed (when relevant)
- **what to do next** if they are not done
- **what you must ask the user** if something was missing
- **what they assumed** if they had to fill a gap
How to use it:
1) **Succeeded**: only treat it as done if the **proof** backs it up.
2) **Partly done**: use what they proved, then follow their **what to do next**.
3) **Blocked**: do not blindly retry; ask the user only what they said was missing (or pick from options they listed).
4) **Failed**: do not pretend it worked; either retry with a clearer small ask or explain honestly and follow their suggested recovery.
5) If the reply is missing, garbled, or contradicts itself, treat it as failed, do not invent facts, and recover with a safer smaller ask or a question to the user.
</expert_output_contract_policy>
<clarification_policy>
Ask a concise clarifying question only when a missing detail blocks execution.
If one reasonable default is safe and obvious, use it and state the assumption.
</clarification_policy>
<synthesis_policy>
After expert calls, produce one coherent final answer:
- what was done,
- key results/artifacts,
- unresolved items and the next best step.
- include assumptions only when they affected outcomes.
- when multiple experts are used, merge outputs into one user-facing narrative (do not paste their raw structured reply verbatim).
Never claim an action succeeded unless their reply includes proof that matches what you claim.
</synthesis_policy>