multi_agent_chat/subagents: HITL umbrella + ToolKind rename

This commit is contained in:
CREDO23 2026-05-14 17:40:29 +02:00
parent 8eaab12971
commit a06aec2821
22 changed files with 621 additions and 79 deletions

View file

@ -18,10 +18,12 @@ from app.agents.multi_agent_chat.constants import (
from app.agents.multi_agent_chat.subagents.mcp_tools.permissions import ( from app.agents.multi_agent_chat.subagents.mcp_tools.permissions import (
TOOLS_PERMISSIONS_BY_AGENT, TOOLS_PERMISSIONS_BY_AGENT,
) )
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.hitl.approvals.middleware_gated import (
middleware_gated_tool_permission_row,
)
from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolPermissionItem, ToolPermissionItem,
ToolsPermissions, ToolsPermissions,
mcp_tool_permission_row,
) )
from app.agents.new_chat.tools.mcp_tool import load_mcp_tools from app.agents.new_chat.tools.mcp_tool import load_mcp_tools
from app.db import SearchSourceConnector from app.db import SearchSourceConnector
@ -129,15 +131,15 @@ def _split_tools_by_permissions(
for t in tools: for t in tools:
meta: dict[str, Any] = getattr(t, "metadata", None) or {} meta: dict[str, Any] = getattr(t, "metadata", None) or {}
if meta.get("hitl") is False: if meta.get("hitl") is False:
allow.append(mcp_tool_permission_row(t)) allow.append(middleware_gated_tool_permission_row(t))
continue continue
key = _get_mcp_tool_name(t) key = _get_mcp_tool_name(t)
if key in allow_names: if key in allow_names:
allow.append(mcp_tool_permission_row(t)) allow.append(middleware_gated_tool_permission_row(t))
elif key in ask_names: elif key in ask_names:
ask.append(mcp_tool_permission_row(t)) ask.append(middleware_gated_tool_permission_row(t))
else: else:
ask.append(mcp_tool_permission_row(t)) ask.append(middleware_gated_tool_permission_row(t))
return {"allow": allow, "ask": ask} return {"allow": allow, "ask": ask}

View file

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolsPermissions, ToolsPermissions,
) )

View file

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolsPermissions, ToolsPermissions,
) )

View file

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolsPermissions, ToolsPermissions,
) )

View file

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolPermissionItem, ToolPermissionItem,
ToolsPermissions, ToolsPermissions,
) )

View file

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolsPermissions, ToolsPermissions,
) )

View file

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolsPermissions, ToolsPermissions,
) )

View file

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolsPermissions, ToolsPermissions,
) )

View file

@ -5,14 +5,13 @@ from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.md_file_reader import ( from app.agents.multi_agent_chat.subagents.shared.md_file_reader import (
read_md_file, read_md_file,
) )
from app.agents.multi_agent_chat.subagents.shared.permissions import ( from app.agents.multi_agent_chat.subagents.shared.subagent_builder import (
pack_subagent,
)
from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolPermissionItem, ToolPermissionItem,
ToolsPermissions, ToolsPermissions,
merge_tools_permissions, merge_tools_permissions,
tool_permission_row,
)
from app.agents.multi_agent_chat.subagents.shared.subagent_builder import (
pack_subagent,
) )
__all__ = [ __all__ = [
@ -21,5 +20,4 @@ __all__ = [
"merge_tools_permissions", "merge_tools_permissions",
"pack_subagent", "pack_subagent",
"read_md_file", "read_md_file",
"tool_permission_row",
] ]

View file

@ -0,0 +1,19 @@
"""Middleware-gated approval primitives — interception via langchain middlewares.
Public surface:
- :func:`middleware_gated_tool_permission_row` tag a tool's row for interception.
- :func:`middleware_gated_interrupt_on` build the ``interrupt_on`` map fed
into ``HumanInTheLoopMiddleware``.
The actual ``HumanInTheLoopMiddleware`` and ``PermissionMiddleware`` instances
that consume these helpers live under
``middleware/shared/permissions/`` (rule-engine slice).
"""
from .interrupt_on import middleware_gated_interrupt_on
from .tool_row import middleware_gated_tool_permission_row
__all__ = [
"middleware_gated_interrupt_on",
"middleware_gated_tool_permission_row",
]

View file

@ -0,0 +1,23 @@
"""Build the ``interrupt_on`` map fed into ``HumanInTheLoopMiddleware``.
The map keys are tool names whose execution must be intercepted before
the call runs. Self-gated rows are intentionally excluded: their bodies
already pause via :func:`request_approval`, and intercepting them too
would double-prompt the user.
"""
from __future__ import annotations
from app.agents.multi_agent_chat.subagents.shared.tool_kinds import ToolsPermissions
def middleware_gated_interrupt_on(bucket: ToolsPermissions) -> dict[str, bool]:
"""``interrupt_on`` map for ``ask`` rows whose bodies don't self-gate."""
return {
r["name"]: True
for r in bucket["ask"]
if r.get("name") and r.get("kind") == "middleware_gated"
}
__all__ = ["middleware_gated_interrupt_on"]

View file

@ -0,0 +1,27 @@
"""Row builder tagging a tool for middleware-gated approval.
Used by MCP tool loading (``mcp_tools/index.py``) so each row carries
``kind="middleware_gated"`` and surfaces in :func:`middleware_gated_interrupt_on`.
Self-gated factories don't call this — they build rows inline with the
default ``kind`` (which collapses to self-gated).
"""
from __future__ import annotations
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolPermissionItem,
)
def middleware_gated_tool_permission_row(tool: BaseTool) -> ToolPermissionItem:
"""Build one allow/ask row tagged ``kind="middleware_gated"``."""
return {
"name": getattr(tool, "name", "") or "",
"tool": tool,
"kind": "middleware_gated",
}
__all__ = ["middleware_gated_tool_permission_row"]

View file

@ -0,0 +1,20 @@
"""Self-gated approval primitive — tools that pause from inside their own body.
Public surface:
- :func:`request_approval` entry point for sensitive tool bodies.
- :func:`self_gated_tool_permission_row` build an allow/ask row for a self-gated tool.
- :class:`HITLResult` outcome contract.
- ``DEFAULT_AUTO_APPROVED_TOOLS`` safe-by-construction allowlist.
"""
from .auto_approved import DEFAULT_AUTO_APPROVED_TOOLS
from .request import request_approval
from .result import HITLResult
from .tool_row import self_gated_tool_permission_row
__all__ = [
"DEFAULT_AUTO_APPROVED_TOOLS",
"HITLResult",
"request_approval",
"self_gated_tool_permission_row",
]

View file

@ -0,0 +1,35 @@
"""Default safe-by-construction allowlist for self-gated approvals.
Tools listed here mirror the safety profile of ``write_file`` against the
SurfSense KB: each call creates exactly one artifact in the user's own
workspace with no external visibility (drafts aren't sent; new files aren't
shared unless the user shares them later). Auto-approving them lets the agent
seed scratch artifacts without firing a popup on every call.
Members still flow through :func:`request_approval` the function returns
immediately with ``decision_type="auto_approved"`` and the original params
untouched. This keeps tool bodies (logging, metadata fetches, account
fallbacks) symmetrical with the prompted path; the only behavior change is
"no interrupt fires".
Per-search-space ``agent_permission_rules`` (when wired) take precedence and
can re-enable prompting for any of these.
"""
from __future__ import annotations
DEFAULT_AUTO_APPROVED_TOOLS: frozenset[str] = frozenset(
{
"create_gmail_draft",
"update_gmail_draft",
"create_calendar_event",
"create_notion_page",
"create_confluence_page",
"create_google_drive_file",
"create_dropbox_file",
"create_onedrive_file",
}
)
__all__ = ["DEFAULT_AUTO_APPROVED_TOOLS"]

View file

@ -0,0 +1,119 @@
"""Self-gated approval entry point — pause from inside a tool body.
Sensitive connector tools (Gmail send, Notion delete, Linear issue create)
call :func:`request_approval` to ask the user before performing the side
effect. The function emits the unified langchain HITL wire payload (so the
parallel-HITL routing layer in ``task_tool`` and ``resume_routing`` sees the
same shape it sees for middleware-gated approvals) and returns a typed
:class:`HITLResult`.
Synchronous on purpose: ``langgraph.types.interrupt`` raises ``GraphInterrupt``
inline; the langgraph runtime catches it. Making this ``async`` would only
move the throw site without changing semantics.
"""
from __future__ import annotations
import logging
from typing import Any
from langgraph.types import interrupt
from app.agents.multi_agent_chat.subagents.shared.hitl.wire import (
LC_DECISION_APPROVE,
LC_DECISION_EDIT,
LC_DECISION_REJECT,
build_lc_hitl_payload,
parse_lc_envelope,
)
from .auto_approved import DEFAULT_AUTO_APPROVED_TOOLS
from .result import HITLResult
logger = logging.getLogger(__name__)
# Decisions a self-gated card may carry back. ``"always"`` is reserved for
# permission-rule promotion (middleware-gated path) and intentionally absent
# here.
_SELF_GATED_DECISIONS: list[str] = [
LC_DECISION_APPROVE,
LC_DECISION_REJECT,
LC_DECISION_EDIT,
]
def request_approval(
*,
action_type: str,
tool_name: str,
params: dict[str, Any],
context: dict[str, Any] | None = None,
trusted_tools: list[str] | None = None,
) -> HITLResult:
"""Pause the graph for user approval and return the user's decision.
Args:
action_type: FE card discriminator (``"gmail_email_send"``,
``"mcp_tool_call"``). Forwarded as ``interrupt_type`` on the
wire so the FE can mount the right card variant.
tool_name: Registered langchain tool name (``"send_gmail_email"``)
shown in the card header and used for trust-list lookups.
params: Original tool arguments. Rendered to the user and used as
defaults when no edits are made.
context: Rich metadata (account info, folder lists, MCP server name)
forwarded verbatim to the FE for richer card chrome.
trusted_tools: Per-session allowlist; when ``tool_name`` is in it the
interrupt is skipped and the tool runs immediately.
Returns:
:class:`HITLResult` with ``rejected=True`` if the user declined or
the resume envelope was unparseable; otherwise ``params`` carries
the original args (or args shallow-merged with the user's edits on
``"edit"``).
"""
if trusted_tools and tool_name in trusted_tools:
logger.info("Tool %r is user-trusted — skipping HITL", tool_name)
return HITLResult(rejected=False, decision_type="trusted", params=dict(params))
if tool_name in DEFAULT_AUTO_APPROVED_TOOLS:
logger.info(
"Tool %r is in DEFAULT_AUTO_APPROVED_TOOLS — skipping HITL", tool_name
)
return HITLResult(
rejected=False, decision_type="auto_approved", params=dict(params)
)
payload = build_lc_hitl_payload(
tool_name=tool_name,
args=params,
allowed_decisions=_SELF_GATED_DECISIONS,
interrupt_type=action_type,
context=context,
)
approval = interrupt(payload)
parsed = parse_lc_envelope(approval)
logger.info("User decision for %r: %s", tool_name, parsed.decision_type)
if parsed.decision_type == LC_DECISION_REJECT:
return HITLResult(rejected=True, decision_type="reject", params=dict(params))
# Anything outside approve/edit at this point is unexpected — fail closed
# so a malformed FE envelope can't smuggle a side effect through.
if parsed.decision_type not in (LC_DECISION_APPROVE, LC_DECISION_EDIT):
logger.warning(
"Unrecognized decision %r for %r — rejecting for safety",
parsed.decision_type,
tool_name,
)
return HITLResult(rejected=True, decision_type="error", params=dict(params))
final_params = (
{**params, **parsed.edited_args} if parsed.edited_args else dict(params)
)
return HITLResult(
rejected=False, decision_type=parsed.decision_type, params=final_params
)
__all__ = ["request_approval"]

View file

@ -0,0 +1,34 @@
"""Outcome contract returned by :func:`request_approval`.
Lives in its own file so callers that only need the type for annotations don't
drag in ``langgraph`` imports through the entry-point module.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True, slots=True)
class HITLResult:
"""Outcome of a self-gated human-in-the-loop approval request.
Attributes:
rejected: ``True`` when the tool MUST NOT execute (user said no, or
the wire envelope was unparseable). Always check this first.
decision_type: Reason tag for logging / metrics
``"approve" | "edit" | "reject" | "trusted" | "auto_approved"
| "error"``. Callers shouldn't branch on this for control flow;
use ``rejected`` for that.
params: Final parameters to pass to the underlying tool. On
``"edit"`` this is the original ``params`` shallow-merged with
the user's edits; otherwise it's a copy of the originals.
"""
rejected: bool
decision_type: str
params: dict[str, Any] = field(default_factory=dict)
__all__ = ["HITLResult"]

View file

@ -0,0 +1,24 @@
"""Row builder for tools that self-gate via :func:`request_approval`.
The default ``kind`` is omitted on purpose: ``ToolPermissionItem`` defaults
to ``self_gated`` when ``kind`` is absent, so the row stays compact while
keeping the type system honest. Symmetric with
:mod:`hitl.approvals.middleware_gated.tool_row` so connector factories can
read the same way for either kind.
"""
from __future__ import annotations
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.subagents.shared.tool_kinds import (
ToolPermissionItem,
)
def self_gated_tool_permission_row(tool: BaseTool) -> ToolPermissionItem:
"""Build one allow/ask row for a self-gated tool (body calls ``request_approval``)."""
return {"name": getattr(tool, "name", "") or "", "tool": tool}
__all__ = ["self_gated_tool_permission_row"]

View file

@ -0,0 +1,26 @@
"""Single source of truth for the langchain HITL wire format used by every approval path.
Public surface:
- :func:`build_lc_hitl_payload` outbound (interrupt argument).
- :func:`parse_lc_envelope` + :class:`ParsedLcDecision` inbound (resume value).
- Decision-type constants for callers that care about identity rather than literals.
"""
from .decision import ParsedLcDecision, parse_lc_envelope
from .payload import (
LC_DECISION_APPROVE,
LC_DECISION_EDIT,
LC_DECISION_REJECT,
SURFSENSE_DECISION_ALWAYS,
build_lc_hitl_payload,
)
__all__ = [
"LC_DECISION_APPROVE",
"LC_DECISION_EDIT",
"LC_DECISION_REJECT",
"SURFSENSE_DECISION_ALWAYS",
"ParsedLcDecision",
"build_lc_hitl_payload",
"parse_lc_envelope",
]

View file

@ -0,0 +1,121 @@
"""Parse the langchain HITL resume envelope into a typed decision.
Both self-gated approvals (``request_approval``) and middleware-gated paths
(``PermissionMiddleware``) receive the user's reply through langgraph's
``Command(resume=...)`` channel as ``{"decisions": [{"type": ..., ...}]}``.
This module owns the decoding so the wire-shape knowledge lives in exactly
one place; callers project the parsed values into their own domain decisions
(``HITLResult`` for self-gated, ``decision_type`` for permissions) without
re-implementing the envelope walk.
Failing closed: any unrecognized envelope shape collapses to
``decision_type="reject"`` (with a warning) so callers never proceed on
ambiguous input.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger(__name__)
@dataclass(frozen=True, slots=True)
class ParsedLcDecision:
"""Decoded resume reply with the fields callers actually need.
Attributes:
decision_type: Lower-cased decision identifier ``"approve"``,
``"reject"``, ``"edit"``, ``"always"``, or any custom value the
FE may emit. Callers map this to their own domain semantics.
edited_args: Populated only on ``"edit"`` replies that actually carry
args; ``None`` otherwise so callers can use truthiness directly.
message: Free-form user feedback (typically attached to ``"reject"``).
``None`` when absent or when the value isn't a non-empty string.
"""
decision_type: str
edited_args: dict[str, Any] | None = None
message: str | None = None
def parse_lc_envelope(envelope: Any) -> ParsedLcDecision:
"""Extract a typed decision from a langgraph resume envelope.
Accepts:
- ``{"decisions": [{"type": "approve" | "reject" | "edit", ...}]}`` the
langchain HITL standard envelope.
- A bare scalar string (``"once"``, ``"always"``, ``"reject"``) used by
the legacy SurfSense permission wire. We tolerate it so the parser can
sit behind both call sites without a second adapter.
Edit args are read from the standard ``edited_action.args`` first, then
fall back to a flat ``args`` field for legacy compatibility both shapes
are produced by the FE depending on which card variant was rendered.
Args:
envelope: The raw resume value as it arrived from langgraph.
Returns:
A :class:`ParsedLcDecision` describing the user's intent.
"""
if isinstance(envelope, str):
return ParsedLcDecision(decision_type=envelope.lower())
if not isinstance(envelope, dict):
logger.warning(
"Resume envelope is not a dict (got %s); treating as reject",
type(envelope).__name__,
)
return ParsedLcDecision(decision_type="reject")
payload: dict[str, Any] = envelope
decisions = envelope.get("decisions")
if isinstance(decisions, list) and decisions:
first = decisions[0]
if isinstance(first, dict):
payload = first
raw_type = payload.get("type") or payload.get("decision_type")
if not raw_type:
logger.warning(
"Resume payload missing decision type (keys=%s); treating as reject",
list(payload.keys()),
)
return ParsedLcDecision(decision_type="reject")
decision_type = str(raw_type).lower()
edited_args = _extract_edited_args(payload) if decision_type == "edit" else None
message = _extract_message(payload)
return ParsedLcDecision(
decision_type=decision_type,
edited_args=edited_args,
message=message,
)
def _extract_edited_args(payload: dict[str, Any]) -> dict[str, Any] | None:
"""Pull non-empty edited args from either the LC nested or flat shape."""
edited_action = payload.get("edited_action")
if isinstance(edited_action, dict):
nested = edited_action.get("args")
if isinstance(nested, dict) and nested:
return nested
flat = payload.get("args")
if isinstance(flat, dict) and flat:
return flat
return None
def _extract_message(payload: dict[str, Any]) -> str | None:
"""Pull a non-empty user-feedback string, accepting either field name."""
raw = payload.get("feedback") or payload.get("message")
if isinstance(raw, str) and raw.strip():
return raw
return None
__all__ = ["ParsedLcDecision", "parse_lc_envelope"]

View file

@ -0,0 +1,85 @@
"""Build the langchain HITL ``interrupt(...)`` payload — single source of truth.
Every approval path in the multi-agent stack self-gated tool bodies that call
``request_approval``, and middleware-gated paths (``HumanInTheLoopMiddleware``,
``PermissionMiddleware``) emits the SAME wire shape from this module so the
parallel-HITL routing layer (``task_tool``, ``resume_routing``) only ever sees
one format. SurfSense-specific extras (FE card discriminator, structured
context) ride alongside the langchain standard fields without colliding with
them.
"""
from __future__ import annotations
from typing import Any
LC_DECISION_APPROVE = "approve"
LC_DECISION_REJECT = "reject"
LC_DECISION_EDIT = "edit"
# ``always`` is a SurfSense extension surfaced by ``PermissionMiddleware`` so a
# single click can promote the matched pattern to a runtime allow rule. The FE
# renders an extra button when it appears in ``allowed_decisions``.
SURFSENSE_DECISION_ALWAYS = "always"
def build_lc_hitl_payload(
*,
tool_name: str,
args: dict[str, Any],
allowed_decisions: list[str],
interrupt_type: str,
description: str | None = None,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build the unified langchain HITL interrupt payload.
Args:
tool_name: The langchain tool's registered name (drives both the action
request and the review config so the FE can pair them).
args: Tool call arguments shown to the user. ``None`` is normalized to
an empty dict so the FE always has a stable shape to render.
allowed_decisions: Subset of
``[LC_DECISION_APPROVE, LC_DECISION_REJECT, LC_DECISION_EDIT,
SURFSENSE_DECISION_ALWAYS]``. Other values are passed through but
the FE may not render a control for them.
interrupt_type: SurfSense card discriminator (``"gmail_email_send"``,
``"permission_ask"``, etc.); the FE keys off this to mount the
right card.
description: Optional human-readable line shown above the args block.
context: Optional structured metadata (account info, matched permission
rules, etc.) forwarded verbatim for richer card chrome.
Returns:
A dict suitable for ``langgraph.types.interrupt(...)``. Top-level
``action_requests`` and ``review_configs`` are what
``collect_pending_tool_calls`` reads at the routing layer; the
SurfSense extensions (``interrupt_type``, ``context``) sit alongside
them langchain ignores unknown keys, so the contract stays clean.
"""
request: dict[str, Any] = {"name": tool_name, "args": args or {}}
if description:
request["description"] = description
payload: dict[str, Any] = {
"action_requests": [request],
"review_configs": [
{
"action_name": tool_name,
"allowed_decisions": list(allowed_decisions),
}
],
"interrupt_type": interrupt_type,
}
if context:
payload["context"] = context
return payload
__all__ = [
"LC_DECISION_APPROVE",
"LC_DECISION_EDIT",
"LC_DECISION_REJECT",
"SURFSENSE_DECISION_ALWAYS",
"build_lc_hitl_payload",
]

View file

@ -1,60 +0,0 @@
"""Typed tool-permission rows: allow vs ask (``name`` + optional ``tool``)."""
from __future__ import annotations
from typing import Literal, NotRequired, TypedDict
from langchain_core.tools import BaseTool
# ``native`` rows self-gate via ``request_approval`` in the tool body;
# ``mcp`` rows are gated by ``HumanInTheLoopMiddleware`` via ``interrupt_on``.
ToolKind = Literal["native", "mcp"]
class ToolPermissionItem(TypedDict):
"""``name`` is always set; ``tool`` is present when a bound tool exists; ``kind`` defaults to ``native`` when absent."""
name: str
tool: NotRequired[BaseTool]
kind: NotRequired[ToolKind]
class ToolsPermissions(TypedDict):
"""Same shape for native factories and MCP name-only policy rows."""
allow: list[ToolPermissionItem]
ask: list[ToolPermissionItem]
def tool_permission_row(tool: BaseTool) -> ToolPermissionItem:
"""Build one allow/ask row for a loaded tool."""
return {"name": getattr(tool, "name", "") or "", "tool": tool}
def mcp_tool_permission_row(tool: BaseTool) -> ToolPermissionItem:
"""Build one allow/ask row tagged ``kind="mcp"`` so it routes through ``HumanInTheLoopMiddleware``."""
return {"name": getattr(tool, "name", "") or "", "tool": tool, "kind": "mcp"}
def merge_tools_permissions(
base: ToolsPermissions,
extra: ToolsPermissions | None,
) -> ToolsPermissions:
"""Concatenate allow/ask lists (e.g. native factory + MCP bucket) before building HITL maps."""
if not extra:
return base
return {
"allow": [*base["allow"], *extra["allow"]],
"ask": [*base["ask"], *extra["ask"]],
}
def middleware_gated_interrupt_on(
bucket: ToolsPermissions,
) -> dict[str, bool]:
"""``interrupt_on`` for ``ask`` rows whose bodies don't self-gate via ``request_approval``."""
return {
r["name"]: True
for r in bucket["ask"]
if r.get("name") and r.get("kind") == "mcp"
}

View file

@ -0,0 +1,69 @@
"""Cross-kind primitives for tool permission rows.
Subagents classify their tools into ``allow`` and ``ask`` buckets, and each
row may be either *self-gated* (the tool body calls
:func:`request_approval`) or *middleware-gated* (a middleware intercepts
the call). This module owns the shared types both kinds need:
- :data:`ToolKind` the discriminator literal.
- :class:`ToolPermissionItem` one row in an allow/ask bucket.
- :class:`ToolsPermissions` the bucket pair.
- :func:`merge_tools_permissions` concatenates two buckets (typically a
self-gated factory bucket and a middleware-gated MCP bucket).
Kind-specific helpers live under ``hitl/approvals/`` next to their gating
mechanism:
- ``hitl/approvals/self_gated/`` body-level ``request_approval`` primitive.
- ``hitl/approvals/middleware_gated/`` row builder + ``interrupt_on`` map.
"""
from __future__ import annotations
from typing import Literal, NotRequired, TypedDict
from langchain_core.tools import BaseTool
ToolKind = Literal["self_gated", "middleware_gated"]
class ToolPermissionItem(TypedDict):
"""One allow/ask row.
``name`` is always set; ``tool`` is present when a bound BaseTool exists
(absent for name-only MCP policy rows). ``kind`` defaults to
``self_gated`` when absent so existing connector factories keep working
without explicit tagging.
"""
name: str
tool: NotRequired[BaseTool]
kind: NotRequired[ToolKind]
class ToolsPermissions(TypedDict):
"""Allow/ask buckets shared by self-gated factories and middleware-gated MCP rows."""
allow: list[ToolPermissionItem]
ask: list[ToolPermissionItem]
def merge_tools_permissions(
base: ToolsPermissions,
extra: ToolsPermissions | None,
) -> ToolsPermissions:
"""Concatenate allow/ask lists (e.g. self-gated factory + middleware-gated MCP) before building HITL maps."""
if not extra:
return base
return {
"allow": [*base["allow"], *extra["allow"]],
"ask": [*base["ask"], *extra["ask"]],
}
__all__ = [
"ToolKind",
"ToolPermissionItem",
"ToolsPermissions",
"merge_tools_permissions",
]