multi_agent_chat/permissions: restructure slice + simplify factory

This commit is contained in:
CREDO23 2026-05-14 17:40:12 +02:00
parent a36b15b834
commit 8eaab12971
18 changed files with 299 additions and 231 deletions

View file

@ -1,16 +1,11 @@
"""Pattern-based allow/deny/ask middleware with HITL fallback.
"""Pattern-based allow/deny/ask middleware with HITL fallback (vertical slice).
Public surface: :class:`PermissionMiddleware` plus
:func:`normalize_permission_decision` for the streaming layer and the
:data:`PatternResolver` type for callers that register per-tool resolvers.
Public surface (one entry point only every other symbol is an internal of
the rule engine and stays inside ``middleware/``, ``ask/``, or ``deny.py``):
- :func:`build_permission_mw` construction recipe shared by every stack.
"""
from .decision import normalize_permission_decision
from .middleware import PermissionMiddleware
from .pattern_resolver import PatternResolver
from .middleware.factory import build_permission_mw
__all__ = [
"PatternResolver",
"PermissionMiddleware",
"normalize_permission_decision",
]
__all__ = ["build_permission_mw"]

View file

@ -0,0 +1,73 @@
"""Translate the unified langchain HITL envelope into permission-domain semantics.
``PermissionMiddleware`` works with the canonical shape
``{decision_type: "once" | "always" | "reject", feedback?: str, edited_args?: dict}``.
The wire envelope arriving from langgraph already lives in the LC HITL shape
(parsed once in :mod:`hitl_wire.decision`); this module performs the small
domain mapping (``approve|edit`` ``once``, ``always`` ``always``,
anything else ``reject``) without re-implementing the envelope walk.
Failing closed: any unrecognised decision becomes ``reject`` (with a warning)
so the middleware never proceeds on ambiguous input.
"""
from __future__ import annotations
import logging
from typing import Any
from app.agents.multi_agent_chat.subagents.shared.hitl.wire import (
LC_DECISION_APPROVE,
LC_DECISION_EDIT,
LC_DECISION_REJECT,
SURFSENSE_DECISION_ALWAYS,
parse_lc_envelope,
)
logger = logging.getLogger(__name__)
# ``approve`` and ``edit`` both mean "let this call go through this once". The
# legacy SurfSense bare-scalar values (``once`` / ``always`` / ``reject``)
# pass through unchanged so historical resume payloads still work.
_LC_TO_PERMISSION: dict[str, str] = {
LC_DECISION_APPROVE: "once",
LC_DECISION_EDIT: "once",
SURFSENSE_DECISION_ALWAYS: "always",
LC_DECISION_REJECT: "reject",
"once": "once",
"always": "always",
"reject": "reject",
}
def normalize_permission_decision(envelope: Any) -> dict[str, Any]:
"""Project the user's reply into the canonical permission decision shape.
Args:
envelope: The raw resume value from langgraph (LC HITL envelope, a
bare scalar string, or a pre-canonical dict).
Returns:
``{"decision_type": "once"|"always"|"reject"}`` plus optional
``feedback`` (``reject`` with a user message) and ``edited_args``
(``edit`` reply with non-empty arg overrides).
"""
parsed = parse_lc_envelope(envelope)
mapped = _LC_TO_PERMISSION.get(parsed.decision_type)
if mapped is None:
logger.warning(
"Unknown permission decision %r; treating as reject",
parsed.decision_type,
)
mapped = "reject"
out: dict[str, Any] = {"decision_type": mapped}
if parsed.message:
out["feedback"] = parsed.message
if parsed.edited_args:
out["edited_args"] = parsed.edited_args
return out
__all__ = ["normalize_permission_decision"]

View file

@ -0,0 +1,10 @@
"""Apply ``edit`` permission decisions to tool calls.
Edited-arg extraction now lives in :mod:`hitl_wire.decision` (single parser
for all approval paths); this module owns the merge step that produces a
fresh tool-call dict for the orchestrator.
"""
from .merge import merge_edited_args
__all__ = ["merge_edited_args"]

View file

@ -0,0 +1,22 @@
"""Apply edited args to a tool call (shallow merge, no mutation).
Edited values override originals; keys absent from ``edited_args`` keep
their original values, so partial edits are safe. Returns a NEW tool-call
dict so the caller can swap it into ``AIMessage.tool_calls`` without
aliasing the live message object.
"""
from __future__ import annotations
from typing import Any
def merge_edited_args(
tool_call: dict[str, Any], edited_args: dict[str, Any]
) -> dict[str, Any]:
original_args = tool_call.get("args") or {}
merged_args = {**original_args, **edited_args}
return {**tool_call, "args": merged_args}
__all__ = ["merge_edited_args"]

View file

@ -0,0 +1,79 @@
"""Build the permission-ask interrupt payload (LC HITL wire + SurfSense context).
The FE's PermissionCard renders from:
- Standard langchain fields (``action_requests``, ``review_configs``) drive
the action chrome and the parallel-HITL routing layer (``task_tool``,
``resume_routing``) that batches concurrent approvals.
- ``interrupt_type="permission_ask"`` selects the permission card variant.
- ``context.patterns`` / ``context.rules`` explain *why* the ask fired.
- ``context.always`` the patterns the user can promote to a permanent
allow rule with a single ``"always"`` reply.
"""
from __future__ import annotations
from typing import Any
from app.agents.multi_agent_chat.subagents.shared.hitl.wire import (
LC_DECISION_APPROVE,
LC_DECISION_EDIT,
LC_DECISION_REJECT,
SURFSENSE_DECISION_ALWAYS,
build_lc_hitl_payload,
)
from app.agents.new_chat.permissions import Rule
PERMISSION_ASK_INTERRUPT_TYPE = "permission_ask"
# The full palette a permission card may surface: approve once, edit-then-
# approve, reject, or "always" to promote the matched pattern.
_PERMISSION_ASK_DECISIONS: list[str] = [
LC_DECISION_APPROVE,
LC_DECISION_REJECT,
LC_DECISION_EDIT,
SURFSENSE_DECISION_ALWAYS,
]
def build_permission_ask_payload(
*,
tool_name: str,
args: dict[str, Any],
patterns: list[str],
rules: list[Rule],
) -> dict[str, Any]:
"""Build the permission-ask interrupt payload.
Args:
tool_name: The tool whose call is being reviewed.
args: The tool call arguments shown in the card.
patterns: Wildcard patterns the call matched (drives ``always``).
rules: Matched ruleset entries surfaced for explainability.
Returns:
A dict suitable for ``langgraph.types.interrupt(...)`` carrying both
the LC HITL standard fields and SurfSense-specific context.
"""
context: dict[str, Any] = {
"patterns": patterns,
"rules": [
{
"permission": r.permission,
"pattern": r.pattern,
"action": r.action,
}
for r in rules
],
"always": patterns,
}
return build_lc_hitl_payload(
tool_name=tool_name,
args=args,
allowed_decisions=_PERMISSION_ASK_DECISIONS,
interrupt_type=PERMISSION_ASK_INTERRUPT_TYPE,
context=context,
)
__all__ = ["PERMISSION_ASK_INTERRUPT_TYPE", "build_permission_ask_payload"]

View file

@ -1,12 +1,12 @@
"""Request a permission decision from the user (side-effectful entry point).
"""Side-effectful entry point: pause the graph and return the permission decision.
Wraps :func:`langgraph.types.interrupt` with the OTel spans that the
SurfSense dashboard expects, then normalises the resume value through
:func:`decision.normalize_permission_decision`.
Wraps :func:`langgraph.types.interrupt` with the OTel spans the SurfSense
dashboard expects, then projects the resume value through
:func:`normalize_permission_decision` so the middleware downstream only
sees the canonical permission-domain shape.
When ``emit_interrupt`` is ``False`` the call short-circuits to
``reject``; this is used by non-interactive deployments where ``ask`` must
not block.
When ``emit_interrupt`` is ``False`` the call short-circuits to ``reject``;
this is used by non-interactive deployments where ``ask`` must not block.
"""
from __future__ import annotations
@ -18,8 +18,8 @@ from langgraph.types import interrupt
from app.agents.new_chat.permissions import Rule
from app.observability import otel as ot
from ..decision import normalize_permission_decision
from .payload import build_permission_ask_payload
from .decision import normalize_permission_decision
from .payload import PERMISSION_ASK_INTERRUPT_TYPE, build_permission_ask_payload
def request_permission_decision(
@ -30,6 +30,7 @@ def request_permission_decision(
rules: list[Rule],
emit_interrupt: bool,
) -> dict[str, Any]:
"""Pause for an ``ask`` decision; return the canonical permission decision dict."""
if not emit_interrupt:
return {"decision_type": "reject"}
@ -43,7 +44,7 @@ def request_permission_decision(
pattern=patterns[0] if patterns else None,
extra={"permission.patterns": list(patterns)},
),
ot.interrupt_span(interrupt_type="permission_ask"),
ot.interrupt_span(interrupt_type=PERMISSION_ASK_INTERRUPT_TYPE),
):
decision = interrupt(payload)
return normalize_permission_decision(decision)

View file

@ -1,91 +0,0 @@
"""Coerce inbound permission decisions to a canonical dict shape.
Two wire formats are accepted:
- SurfSense legacy: ``{"decision_type": "once"|"always"|"reject", "feedback"?}``.
- LangChain HITL envelope: ``{"decisions": [{"type": "approve"|"edit"|"reject", ...}]}``.
The middleware downstream only inspects the canonical shape returned here,
so adding a new envelope means changing this module alone.
The middleware fails closed: any unrecognised payload becomes ``reject``
(with a warning) so the agent never proceeds on ambiguous input.
When the reply is an ``edit``, the result keeps ``decision_type="once"``
(the call still goes through) and adds an ``edited_args`` key holding the
user-modified ``args`` dict. The orchestrator merges those into the
``tool_call`` before keeping it; see :mod:`interrupt.edit.merge`.
"""
from __future__ import annotations
import logging
from typing import Any
from .interrupt.edit import extract_edited_args
logger = logging.getLogger(__name__)
# ``edit`` collapses to ``once``; any ``edited_args`` ride on the result.
_LC_TYPE_TO_PERMISSION_DECISION: dict[str, str] = {
"approve": "once",
"reject": "reject",
"edit": "once",
}
def normalize_permission_decision(decision: Any) -> dict[str, Any]:
"""Return ``{"decision_type": ..., "feedback"?: str, "edited_args"?: dict}``."""
if isinstance(decision, str):
return {"decision_type": decision}
if not isinstance(decision, dict):
logger.warning(
"Unrecognized permission resume value (%s); treating as reject",
type(decision).__name__,
)
return {"decision_type": "reject"}
if decision.get("decision_type"):
return decision
payload: dict[str, Any] = decision
decisions = decision.get("decisions")
if isinstance(decisions, list) and decisions:
first = decisions[0]
if isinstance(first, dict):
payload = first
raw_type = payload.get("type") or payload.get("decision_type")
if not raw_type:
logger.warning(
"Permission resume missing decision type (keys=%s); treating as reject",
list(payload.keys()),
)
return {"decision_type": "reject"}
raw_type = str(raw_type).lower()
mapped = _LC_TYPE_TO_PERMISSION_DECISION.get(raw_type)
if mapped is None:
# Tolerate legacy values arriving without ``decision_type`` wrapping.
if raw_type in {"once", "always", "reject"}:
mapped = raw_type
else:
logger.warning(
"Unknown permission decision type %r; treating as reject", raw_type
)
mapped = "reject"
out: dict[str, Any] = {"decision_type": mapped}
feedback = payload.get("feedback") or payload.get("message")
if isinstance(feedback, str) and feedback.strip():
out["feedback"] = feedback
if raw_type == "edit":
edited = extract_edited_args(payload)
if edited:
out["edited_args"] = edited
return out
__all__ = ["normalize_permission_decision"]

View file

@ -1,6 +0,0 @@
"""Apply ``edit`` permission decisions to tool calls (extract + merge)."""
from .extract import extract_edited_args
from .merge import merge_edited_args
__all__ = ["extract_edited_args", "merge_edited_args"]

View file

@ -1,34 +0,0 @@
"""Extract edited args from a permission decision payload.
Two shapes are accepted (mirrors :func:`app.agents.new_chat.tools.hitl._parse_decision`):
- LangChain HITL envelope: ``{"edited_action": {"args": {...}}}``.
- Legacy flat shape: ``{"args": {...}}``.
Returns ``None`` when no edited args are present. The orchestrator decides
whether to merge them (see :mod:`interrupt.edit.merge`); this module is pure parsing.
"""
from __future__ import annotations
from typing import Any
def extract_edited_args(decision_payload: dict[str, Any] | None) -> dict[str, Any] | None:
if not isinstance(decision_payload, dict):
return None
edited_action = decision_payload.get("edited_action")
if isinstance(edited_action, dict):
edited_args = edited_action.get("args")
if isinstance(edited_args, dict):
return edited_args
flat_args = decision_payload.get("args")
if isinstance(flat_args, dict):
return flat_args
return None
__all__ = ["extract_edited_args"]

View file

@ -1,25 +0,0 @@
"""Apply edited args to a tool call.
Semantics match :func:`app.agents.new_chat.tools.hitl.request_approval`'s
``final_params = {**params, **edited_params}`` shallow merge, edited
values override originals. Keys absent from ``edited_args`` keep their
original values, so partial edits are safe.
Returns a NEW ``tool_call`` dict (the input is not mutated) so the caller
can swap it into the ``AIMessage.tool_calls`` list without aliasing.
"""
from __future__ import annotations
from typing import Any
def merge_edited_args(
tool_call: dict[str, Any], edited_args: dict[str, Any]
) -> dict[str, Any]:
original_args = tool_call.get("args") or {}
merged_args = {**original_args, **edited_args}
return {**tool_call, "args": merged_args}
__all__ = ["merge_edited_args"]

View file

@ -1,43 +0,0 @@
"""Build the ``permission_ask`` interrupt payload (pure data).
The frontend's streaming layer keys off ``type`` and renders the approval
card from ``action`` (the tool call being reviewed) and ``context``
(the matched rules and patterns that prompted the ask). ``context.always``
lists the patterns the user can promote to a permanent allow rule with a
single ``"always"`` reply.
"""
from __future__ import annotations
from typing import Any
from app.agents.new_chat.permissions import Rule
def build_permission_ask_payload(
*,
tool_name: str,
args: dict[str, Any],
patterns: list[str],
rules: list[Rule],
) -> dict[str, Any]:
return {
"type": "permission_ask",
# ``params`` (not ``args``) is what SurfSense's streaming normalizer forwards.
"action": {"tool": tool_name, "params": args or {}},
"context": {
"patterns": patterns,
"rules": [
{
"permission": r.permission,
"pattern": r.pattern,
"action": r.action,
}
for r in rules
],
"always": patterns,
},
}
__all__ = ["build_permission_ask_payload"]

View file

@ -5,10 +5,10 @@ LangChain's :class:`HumanInTheLoopMiddleware` only supports a static
allow/deny/ask, no glob patterns, no per-space/per-thread overrides, and
no auto-deny synthesis.
This middleware layers OpenCode's wildcard-ruleset model on top of
SurfSense's ``interrupt({type, action, context})`` payload shape (see
:mod:`app.agents.new_chat.tools.hitl`) so the frontend keeps working
unchanged.
This middleware layers OpenCode's wildcard-ruleset model on top of the
unified langchain HITL wire format (see :mod:`hitl_wire`), so it sits
beside ``HumanInTheLoopMiddleware`` and self-gated approvals on a single
parallel-HITL routing layer in ``task_tool`` + ``resume_routing``.
Per-tool-call flow inside :meth:`_process`:
@ -47,13 +47,13 @@ from langgraph.runtime import Runtime
from app.agents.new_chat.errors import CorrectedError, RejectedError
from app.agents.new_chat.permissions import Ruleset
from ..ask.edit import merge_edited_args
from ..ask.request import request_permission_decision
from ..deny import build_deny_message
from ..interrupt.edit import merge_edited_args
from ..interrupt.request import request_permission_decision
from ..pattern_resolver import PatternResolver
from ..runtime_promote import persist_always
from .evaluation import evaluate_tool_call
from .pattern_resolver import PatternResolver
from .ruleset_view import all_rulesets
from .runtime_promote import persist_always
logger = logging.getLogger(__name__)

View file

@ -24,7 +24,7 @@ from app.agents.new_chat.permissions import (
evaluate_many,
)
from ..pattern_resolver import PatternResolver, default_pattern_resolver
from .pattern_resolver import PatternResolver, default_pattern_resolver
logger = logging.getLogger(__name__)

View file

@ -0,0 +1,68 @@
"""Construction recipe for :class:`PermissionMiddleware` shared across stacks.
Single source of truth used by both the main-agent stack and every subagent
stack. Rule layers are evaluated earliest-to-latest (last match wins,
matching OpenCode's ``permission/index.ts`` evaluation order):
1. ``surfsense_defaults`` single ``allow */*`` rule. Connector tools
already self-gate via :func:`request_approval`, so the rule engine only
needs to *deny* what the user has explicitly forbidden; the default
``ask`` fallback would otherwise double-prompt every safe read-only
call.
2. ``extra_rulesets`` caller-supplied policies. The KB subagent contributes
its destructive-FS ``ask`` rules here; connectors will follow once
they migrate off ``interrupt_on``.
Connector deny synthesis from ``new_chat._synthesize_connector_deny_rules``
is intentionally NOT replicated: the multi-agent orchestrator already
excludes entire subagents whose required connectors are missing
(``SUBAGENT_TO_REQUIRED_CONNECTOR_MAP``), so the per-tool deny pass is
redundant here.
"""
from __future__ import annotations
from app.agents.new_chat.feature_flags import AgentFeatureFlags
from app.agents.new_chat.permissions import Rule, Ruleset
from .core import PermissionMiddleware
_SURFSENSE_DEFAULTS = Ruleset(
rules=[Rule(permission="*", pattern="*", action="allow")],
origin="surfsense_defaults",
)
def build_permission_mw(
*,
flags: AgentFeatureFlags,
extra_rulesets: list[Ruleset] | None = None,
) -> PermissionMiddleware | None:
"""Return a configured :class:`PermissionMiddleware` or ``None`` when no work is needed.
Args:
flags: Feature toggles. ``enable_permission`` switches the engine on;
``disable_new_agent_stack`` overrides everything for safety.
extra_rulesets: Caller-supplied rulesets layered after the defaults.
Subagents pass their own policy here so each subagent owns its
rules without aliasing a shared engine. Presence of any extra
ruleset forces the middleware on regardless of
``enable_permission`` an explicit ``ask`` rule always asks.
Returns:
``None`` when the engine has no rules to enforce
(``enable_permission=False`` and no extras); a configured middleware
otherwise.
"""
permission_enabled = flags.enable_permission and not flags.disable_new_agent_stack
has_extras = bool(extra_rulesets)
if not (permission_enabled or has_extras):
return None
rulesets: list[Ruleset] = [_SURFSENSE_DEFAULTS]
if extra_rulesets:
rulesets.extend(extra_rulesets)
return PermissionMiddleware(rulesets=rulesets)
__all__ = ["build_permission_mw"]

View file

@ -3,7 +3,8 @@
Mirrors ``middleware/stack.py`` (the orchestrator's middleware stack) but
exposes its contents as a dict keyed by purpose so specialists can pick
the entries they need and decide ordering. The default consumer
(``pack_subagent``) prepends every non-``None`` value in insertion order.
(:func:`pack_subagent`) prepends every non-``None`` value in insertion
order, so ``None`` slots are silently skipped.
Registry subagents never touch the SurfSense filesystem that capability
belongs to ``knowledge_base`` so no FS middleware is exposed here.
@ -13,6 +14,9 @@ from __future__ import annotations
from typing import Any
from app.agents.new_chat.feature_flags import AgentFeatureFlags
from ..shared.permissions import build_permission_mw
from ..shared.resilience import ResilienceMiddlewares
from ..shared.todos import build_todos_mw
@ -20,9 +24,24 @@ from ..shared.todos import build_todos_mw
def build_subagent_middleware_stack(
*,
resilience: ResilienceMiddlewares,
flags: AgentFeatureFlags | None = None,
) -> dict[str, Any]:
"""Assemble the dict of middlewares prepended to every subagent's stack.
Args:
resilience: Pre-built retry / fallback / call-limit middlewares
(shared with the orchestrator stack to keep behaviour symmetric).
flags: Feature flags driving optional layers. ``None`` disables the
permission layer (used in tests that only need todos+resilience).
Returns:
Insertion-ordered dict; ``None`` values are tolerated and dropped by
the consumer so callers can flip slots on/off without reshaping.
"""
permission = build_permission_mw(flags=flags) if flags is not None else None
return {
"todos": build_todos_mw(),
"permission": permission,
"retry": resilience.retry,
"fallback": resilience.fallback,
"model_call_limit": resilience.model_call_limit,