Harden Linear and Slack MCP subagent permissions.

This commit is contained in:
CREDO23 2026-04-29 20:24:21 +02:00
parent 3dec2a7327
commit 41cb4a567b
3 changed files with 243 additions and 23 deletions

View file

@ -0,0 +1,166 @@
"""Linear provider specialist subagent.
This file is intentionally standalone so provider specialists can be reviewed
and evolved independently (one provider per file).
"""
from __future__ import annotations
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
from app.agents.new_chat.permissions import Rule, Ruleset
from app.agents.new_chat.subagents.constants import NON_PROVIDER_STATE_MUTATION_DENY
from app.services.mcp_oauth.registry import (
LINEAR_MCP_READONLY_TOOL_NAMES,
linear_mcp_original_tool_name,
)
if TYPE_CHECKING:
from deepagents import SubAgent
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
# Read vs write Linear MCP tools are defined in
# ``app.services.mcp_oauth.registry`` (``LINEAR_MCP_READONLY_TOOL_NAMES`` /
# ``LINEAR_MCP_WRITE_TOOL_NAMES``). Any other Linear-domain tool requires approval.
LINEAR_SYSTEM_PROMPT = """You are the linear_specialist subagent for SurfSense.
Role:
- You are the Linear domain specialist. Handle Linear-only requests accurately.
Primary objective:
- Resolve the user's Linear task and return a concise, auditable result.
Routing boundary:
- Use this subagent for Linear-domain tasks (issues, status, assignees, labels,
teams, and project references).
- If the task is primarily non-Linear or cross-connector orchestration, return
status=needs_input and hand control back to the parent with the exact next hop.
Execution steps:
1) Verify Linear access first (use get_connected_accounts if needed).
2) Prefer read/list tools first to gather current issue facts before concluding.
3) Track key identifiers in your reasoning: issue ID, issue key, team ID, label ID.
4) If required identifiers are missing, ask the parent for exactly what is missing.
5) Return a compact result with findings + evidence references.
Output format:
- status: success | needs_input | blocked | error
- summary: one short paragraph
- evidence: bullet list of concrete IDs / issue keys used
- next_step: one sentence (only when blocked or needs_input)
Constraints:
- Do not invent issue keys, IDs, or workflow state names.
- Mutating Linear operations are allowed only with explicit approval.
- If Linear connector access is unavailable, stop and return status=blocked.
"""
def _select_linear_tools(tools: Sequence[BaseTool]) -> list[BaseTool]:
"""Keep Linear tools plus minimal shared read utilities."""
allowed_exact = {
"get_connected_accounts",
"read_file",
"ls",
"glob",
"grep",
}
selected: list[BaseTool] = []
for tool in tools:
if tool.name in allowed_exact:
selected.append(tool)
continue
if linear_mcp_original_tool_name(tool.name) is not None:
selected.append(tool)
continue
if tool.name.startswith("linear_") or tool.name.endswith("_linear_issue"):
selected.append(tool)
return selected
def _is_linear_readonly_tool_name(name: str) -> bool:
"""Return True when a tool name maps to a read-only Linear MCP operation."""
base = linear_mcp_original_tool_name(name)
return base is not None and base in LINEAR_MCP_READONLY_TOOL_NAMES
def _is_linear_domain_tool_name(name: str) -> bool:
"""Return True for Linear-domain tools handled by this specialist."""
if linear_mcp_original_tool_name(name) is not None:
return True
return name.startswith("linear_") or name.endswith("_linear_issue")
def _permission_middleware(*, selected_tools: Sequence[BaseTool]) -> Any:
"""Permission policy for Linear specialist."""
from app.agents.new_chat.middleware.permission import PermissionMiddleware
ask_tools = sorted(
{
tool.name
for tool in selected_tools
if _is_linear_domain_tool_name(tool.name)
and not _is_linear_readonly_tool_name(tool.name)
}
)
rules: list[Rule] = [Rule(permission="*", pattern="*", action="allow")]
rules.extend(
Rule(permission=name, pattern="*", action="deny")
for name in NON_PROVIDER_STATE_MUTATION_DENY
)
rules.extend(
Rule(permission=name, pattern="*", action="ask")
for name in ask_tools
)
return PermissionMiddleware(
rulesets=[Ruleset(rules=rules, origin="subagent_linear_specialist")]
)
def _wrap_subagent_middleware(
*,
selected_tools: Sequence[BaseTool],
extra_middleware: Sequence[Any] | None,
) -> list[Any]:
"""Apply standard middleware chain used by other subagents."""
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
from app.agents.new_chat.middleware import DedupHITLToolCallsMiddleware
return [
*(extra_middleware or []),
_permission_middleware(selected_tools=selected_tools),
PatchToolCallsMiddleware(),
DedupHITLToolCallsMiddleware(agent_tools=list(selected_tools)),
]
def build_linear_specialist_subagent(
*,
tools: Sequence[BaseTool],
model: BaseChatModel | None = None,
extra_middleware: Sequence[Any] | None = None,
) -> SubAgent:
"""Build the ``linear_specialist`` provider subagent spec."""
selected_tools = _select_linear_tools(tools)
spec: dict[str, Any] = {
"name": "linear_specialist",
"description": (
"Linear operations specialist for issue and workflow requests, "
"with strict evidence tracking and approval-gated mutating operations."
),
"system_prompt": LINEAR_SYSTEM_PROMPT,
"tools": selected_tools,
"middleware": _wrap_subagent_middleware(
selected_tools=selected_tools,
extra_middleware=extra_middleware,
),
}
if model is not None:
spec["model"] = model
return spec # type: ignore[return-value]

View file

@ -18,23 +18,26 @@ if TYPE_CHECKING:
from langchain_core.tools import BaseTool
# Slack MCP references used for this provider policy:
# Official references:
# - https://docs.slack.dev/ai/slack-mcp-server
# - https://www.npmjs.com/package/@modelcontextprotocol/server-slack
#
# We explicitly gate known write/mutation operations behind approval (`ask`)
# instead of relying on broad generic write heuristics.
SLACK_MUTATION_TOOL_NAMES: frozenset[str] = frozenset(
# Policy: only known read-only Slack tools are auto-allowed. Any other
# ``slack_*`` tool is treated as mutating and requires explicit approval.
SLACK_READONLY_TOOL_NAMES: frozenset[str] = frozenset(
{
# modelcontextprotocol server
"slack_post_message",
"slack_reply_to_thread",
"slack_add_reaction",
# Slack-hosted MCP naming variants
"slack_send_message",
"slack_draft_message",
"slack_create_canvas",
"slack_update_canvas",
# Slack-hosted MCP read tools
"slack_search_channels",
"slack_read_channel",
"slack_read_thread",
"slack_read_canvas",
"slack_read_user_profile",
# modelcontextprotocol/server-slack read tools
"slack_list_channels",
"slack_get_channel_history",
"slack_get_thread_replies",
"slack_get_users",
"slack_get_user_profile",
}
)
@ -92,17 +95,25 @@ def _select_slack_tools(tools: Sequence[BaseTool]) -> list[BaseTool]:
return selected
def _permission_middleware() -> Any:
def _permission_middleware(*, selected_tools: Sequence[BaseTool]) -> Any:
"""Permission policy for Slack specialist.
Intent:
- Allow Slack-domain operations by default.
- Gate known Slack mutating operations behind approval (`ask`).
- Gate Slack mutating operations behind approval (`ask`).
- Hard-deny non-Slack state mutations, especially KB virtual filesystem
mutation and parent-context mutation tools.
"""
from app.agents.new_chat.middleware.permission import PermissionMiddleware
ask_tools = sorted(
{
tool.name
for tool in selected_tools
if tool.name.startswith("slack_")
and tool.name not in SLACK_READONLY_TOOL_NAMES
}
)
rules: list[Rule] = [Rule(permission="*", pattern="*", action="allow")]
rules.extend(
Rule(permission=name, pattern="*", action="deny")
@ -110,7 +121,7 @@ def _permission_middleware() -> Any:
)
rules.extend(
Rule(permission=name, pattern="*", action="ask")
for name in SLACK_MUTATION_TOOL_NAMES
for name in ask_tools
)
return PermissionMiddleware(
rulesets=[Ruleset(rules=rules, origin="subagent_slack_specialist")]
@ -129,7 +140,7 @@ def _wrap_subagent_middleware(
return [
*(extra_middleware or []),
_permission_middleware(),
_permission_middleware(selected_tools=selected_tools),
PatchToolCallsMiddleware(),
DedupHITLToolCallsMiddleware(agent_tools=list(selected_tools)),
]

View file

@ -14,10 +14,57 @@ accuracy high.
from __future__ import annotations
import re
from dataclasses import dataclass, field
from app.db import SearchSourceConnectorType
# Linear hosted MCP (https://linear.app/docs/mcp). Tool names are matched at
# discovery time: names the server does not advertise are ignored.
# See also https://github.com/linear/linear/issues/1049 for server-reported names.
LINEAR_MCP_WRITE_TOOL_NAMES: frozenset[str] = frozenset({"save_issue"})
LINEAR_MCP_READONLY_TOOL_NAMES: frozenset[str] = frozenset(
{
# Issues
"list_issues",
"get_issue",
"list_my_issues",
"list_issue_statuses",
"list_issue_labels",
"list_comments",
# People & teams
"list_users",
"get_user",
"list_teams",
"get_team",
# Projects & planning
"list_projects",
"get_project",
"list_project_labels",
"list_cycles",
# Documents
"list_documents",
"get_document",
# Misc read
"search_documentation",
}
)
LINEAR_MCP_TOOL_NAMES: frozenset[str] = (
LINEAR_MCP_READONLY_TOOL_NAMES | LINEAR_MCP_WRITE_TOOL_NAMES
)
_LINEAR_MCP_PREFIXED_NAME_RE = re.compile(r"^linear_\d+_(.+)$")
def linear_mcp_original_tool_name(name: str) -> str | None:
"""Map ``linear_<connector_id>_<tool>`` or bare MCP tool name to base name."""
m = _LINEAR_MCP_PREFIXED_NAME_RE.match(name)
if m:
base = m.group(1)
return base if base in LINEAR_MCP_TOOL_NAMES else None
if name in LINEAR_MCP_TOOL_NAMES:
return name
return None
@dataclass(frozen=True)
class MCPServiceConfig:
@ -50,12 +97,8 @@ MCP_SERVICES: dict[str, MCPServiceConfig] = {
name="Linear",
mcp_url="https://mcp.linear.app/mcp",
connector_type="LINEAR_CONNECTOR",
allowed_tools=[
"list_issues",
"get_issue",
"save_issue",
],
readonly_tools=frozenset({"list_issues", "get_issue"}),
allowed_tools=sorted(LINEAR_MCP_TOOL_NAMES),
readonly_tools=LINEAR_MCP_READONLY_TOOL_NAMES,
account_metadata_keys=["organization_name", "organization_url_key"],
),
"jira": MCPServiceConfig(