plano/cli/planoai/skills.py
Spherrrical 5e8d27fd3c fix(skills): honor skills-only orchestrator decisions, dedupe runtime helpers, warn on dropped picks
Addresses the code-review findings on 7f5bf641:

- Honor skills-only decisions: RouteDecision.route_name is now Option<String> and the orchestrator emits a decision when routes is empty but skills is non-empty. The LLM handler falls back to the originally-requested model and still injects activated skill bodies, matching the contract in docs/source/resources/skills.rst.
- Warn on allow-list misses: resolve_for_route now returns a SkillResolution that splits drops into "not allow-listed for this route" vs "not in catalog (hallucinated)". brightstaff logs each bucket so misconfigured routing_preferences[].skills lists become visible instead of vanishing silently.
- Consolidate runtime: common::skills_runtime is now the single source of truth (referenced_skills_catalog, resolve_for_route, resolve_selected_skills, augment_system_prompt_with_skills). brightstaff drops its local re-implementations and calls into common.
- Tests: 11 new tests in common::skills_runtime (catalog union, allow-list intersection, dedup, hallucination handling, XML escaping, body size cap) and 6 new tests in brightstaff::handlers::llm::model_selection cover inject_activated_skills_into_request, including the first-system-message rule and the Parts->Text flatten — both now documented on the function.
- Cap skill body size at 32 KiB with a UTF-8-safe tail-trim + marker so an oversized SKILL.md cannot blow the downstream context window.
- XML-escape skill name and base_dir in the <skill_content> wrapper as defense-in-depth (names are validated upstream, but the wrapper sits inside the system prompt).
- Bound find_project_root at \$HOME plus a 30-parent depth cap so CLI invocations outside HOME no longer walk to /.
2026-05-18 12:39:21 -07:00

451 lines
14 KiB
Python

"""Agent Skills discovery for Plano.
Parses SKILL.md files from .plano/skills/ (project scope) and ~/.plano/skills/
(user scope) following the Agent Skills specification:
https://agentskills.io/specification.md
The parser is intentionally lenient (per the "Adding skills support" guide):
warn on cosmetic issues but only skip a skill when its YAML is unparseable or
its required `description` field is missing.
"""
from __future__ import annotations
import json
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
import yaml
from planoai.utils import getLogger
log = getLogger(__name__)
PROJECT_SKILLS_DIR = Path(".plano") / "skills"
USER_SKILLS_DIR = Path(os.path.expanduser("~/.plano/skills"))
# Universal Agent Skills install location used by `npx skills add` (vercel-labs/add-skill).
# Auto-trusted: same security posture as ~/.plano/skills, no project trust needed.
AGENTS_SKILLS_DIR = Path(os.path.expanduser("~/.agents/skills"))
MAX_CATALOG_BYTES = 5 * 1024
MAX_DIRS_SCANNED = 2000
_NAME_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9-]*[a-z0-9])?$")
def trusted_projects_file() -> Path:
"""Resolve `~/.plano/trusted_projects.json` at call time.
Lazy so tests can override $HOME and have the new path picked up; module
import time would freeze it to the developer's actual home directory.
"""
return Path(os.path.expanduser("~/.plano/trusted_projects.json"))
def is_project_trusted(project_root: Path) -> bool:
"""Return True if `project_root` is listed in `~/.plano/trusted_projects.json`.
Project-scope skills come from arbitrary repos and are gated on this trust
decision (set with `planoai skills trust`). Single source of truth, shared
between the `skills_cmd` CLI surface and the render pipeline.
"""
path = trusted_projects_file()
if not path.exists():
return False
try:
with path.open("r", encoding="utf-8") as fh:
data = json.load(fh)
except (OSError, json.JSONDecodeError):
return False
trusted = data.get("trusted_projects", []) if isinstance(data, dict) else []
resolved = str(project_root.resolve())
return resolved in {str(Path(p).resolve()) for p in trusted}
@dataclass(frozen=True)
class SkillDiagnostic:
severity: str # "warn" or "error"
message: str
path: Path
@dataclass
class Skill:
name: str
description: str
location: Path
base_dir: Path
body: str
scope: str
compatibility: str | None = None
license: str | None = None
metadata: dict = field(default_factory=dict)
allowed_tools: str | None = None
def to_dict(self) -> dict:
"""Serialize to a YAML-friendly dict for embedding in rendered config."""
return {
"name": self.name,
"description": self.description,
"path": str(self.location),
"base_dir": str(self.base_dir),
"scope": self.scope,
"body": self.body,
"compatibility": self.compatibility,
"license": self.license,
"metadata": dict(self.metadata) if self.metadata else None,
"allowed_tools": self.allowed_tools,
}
_MAX_PROJECT_ROOT_WALK_DEPTH = 30
def find_project_root(start: Path | None = None) -> Path:
"""Walk up from ``start`` looking for ``.plano/``, then ``.git/``.
The walk is bounded so a CLI invocation in a deeply-nested or
pathological directory does not iterate all the way to ``/`` on every
call. Two bounds apply, whichever fires first:
* **$HOME**: when ``start`` is inside the user's home directory, the
walk stops at ``$HOME`` itself. We never inspect siblings of
``$HOME`` like ``/Users`` — picking up a stray ``.git/`` there would
be more surprising than helpful.
* **Hard depth cap** (``_MAX_PROJECT_ROOT_WALK_DEPTH`` parents): a
defensive fallback for paths outside ``$HOME`` (e.g. ``/tmp/...``)
so we still terminate quickly on absurdly deep trees.
Falls back to ``start`` (or cwd) if nothing is found. This matches how
``npx skills add`` chooses a project root.
"""
base = Path(start or Path.cwd()).resolve()
try:
home = Path(os.path.expanduser("~")).resolve()
except (OSError, RuntimeError):
home = None
def _ancestors(start_dir: Path) -> list[Path]:
out: list[Path] = []
cur = start_dir
for _ in range(_MAX_PROJECT_ROOT_WALK_DEPTH + 1):
out.append(cur)
if home is not None and cur == home:
break
if cur == cur.parent:
break
cur = cur.parent
return out
ancestors = _ancestors(base)
for cur in ancestors:
if (cur / ".plano").exists():
return cur
for cur in ancestors:
if (cur / ".git").exists():
return cur
return base
def parse_skill_md(path: Path) -> tuple[Skill | None, list[SkillDiagnostic]]:
"""Parse a single SKILL.md file leniently."""
diagnostics: list[SkillDiagnostic] = []
try:
text = path.read_text(encoding="utf-8")
except OSError as exc:
diagnostics.append(
SkillDiagnostic("error", f"failed to read SKILL.md: {exc}", path)
)
return None, diagnostics
frontmatter, body = _split_frontmatter(text)
if frontmatter is None:
diagnostics.append(SkillDiagnostic("error", "missing YAML frontmatter", path))
return None, diagnostics
data = _parse_yaml_lenient(frontmatter, path, diagnostics)
if data is None:
return None, diagnostics
description = data.get("description")
if not isinstance(description, str) or not description.strip():
diagnostics.append(
SkillDiagnostic(
"error", "skill is missing required 'description' field", path
)
)
return None, diagnostics
parent_name = path.parent.name
name = data.get("name")
if not isinstance(name, str) or not name.strip():
diagnostics.append(
SkillDiagnostic(
"warn",
f"missing 'name' field; falling back to parent directory '{parent_name}'",
path,
)
)
name = parent_name
name = name.strip()
if len(name) > 64:
diagnostics.append(
SkillDiagnostic("warn", "skill name exceeds 64 characters", path)
)
if not _NAME_PATTERN.match(name):
diagnostics.append(
SkillDiagnostic(
"warn",
f"skill name '{name}' violates spec naming rules "
"(lowercase alphanumeric + hyphens, no leading/trailing/double hyphens)",
path,
)
)
if name != parent_name:
diagnostics.append(
SkillDiagnostic(
"warn",
f"skill name '{name}' does not match parent directory '{parent_name}'",
path,
)
)
metadata_raw = data.get("metadata")
metadata = {}
if isinstance(metadata_raw, dict):
metadata = {str(k): str(v) for k, v in metadata_raw.items()}
skill = Skill(
name=name,
description=description.strip(),
location=path.resolve(),
base_dir=path.parent.resolve(),
body=body,
scope="project", # may be overridden by caller
compatibility=_string_field(data.get("compatibility")),
license=_string_field(data.get("license")),
metadata=metadata,
allowed_tools=_string_field(data.get("allowed-tools")),
)
return skill, diagnostics
def _split_frontmatter(text: str) -> tuple[str | None, str]:
if not text.startswith("---"):
return None, text
m = re.match(r"^---\s*\r?\n(.*?)\r?\n---\s*(?:\r?\n)?(.*)$", text, re.DOTALL)
if not m:
return None, text
return m.group(1), m.group(2).strip("\n")
def _parse_yaml_lenient(
frontmatter: str, path: Path, diagnostics: list[SkillDiagnostic]
) -> dict | None:
try:
data = yaml.safe_load(frontmatter)
except yaml.YAMLError as exc:
retried = _retry_quote_problem_fields(frontmatter)
if retried is None:
diagnostics.append(
SkillDiagnostic("error", f"YAML parse error: {exc}", path)
)
return None
try:
data = yaml.safe_load(retried)
except yaml.YAMLError as exc2:
diagnostics.append(
SkillDiagnostic(
"error", f"YAML parse error (after retry): {exc2}", path
)
)
return None
if not isinstance(data, dict):
diagnostics.append(
SkillDiagnostic("error", "frontmatter is not a YAML mapping", path)
)
return None
return data
_PROBLEM_FIELDS = ("description", "compatibility")
def _retry_quote_problem_fields(frontmatter: str) -> str | None:
"""Wrap unquoted values for fields prone to YAML colon-collisions in quotes."""
lines = frontmatter.splitlines()
out: list[str] = []
changed = False
for line in lines:
m = re.match(r"^(\w[\w-]*)\s*:\s*(.*)$", line)
if m and m.group(1) in _PROBLEM_FIELDS:
key = m.group(1)
value = m.group(2).rstrip()
if value and not (
(value.startswith("'") and value.endswith("'"))
or (value.startswith('"') and value.endswith('"'))
):
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
out.append(f'{key}: "{escaped}"')
changed = True
continue
out.append(line)
if not changed:
return None
return "\n".join(out)
def _string_field(value) -> str | None:
if value is None:
return None
s = str(value).strip()
return s or None
def _iter_skill_dirs(root: Path) -> Iterable[Path]:
if not root.exists() or not root.is_dir():
return
try:
children = sorted(root.iterdir(), key=lambda p: p.name)
except OSError:
return
count = 0
for child in children:
count += 1
if count > MAX_DIRS_SCANNED:
log.warning(
"exceeded max scan budget (%d) while looking for skills in %s",
MAX_DIRS_SCANNED,
root,
)
break
if not child.is_dir():
continue
if child.name.startswith("."):
continue
yield child
def discover_skills(
project_root: Path | None = None,
include_user_scope: bool = True,
) -> tuple[list[Skill], list[SkillDiagnostic]]:
"""Discover all skills available to the current project.
Precedence (highest first): project > user > agents. Project-scope
skills shadow lower tiers with the same name; user-scope shadows
agents-scope. Both ``~/.plano/skills/`` (Plano-native) and
``~/.agents/skills/`` (the universal Agent Skills install location used
by ``npx skills add``) are treated as auto-trusted user-tier scopes.
Returns ``(skills, diagnostics)`` sorted by name.
"""
project_root = find_project_root(project_root)
project_dir = project_root / PROJECT_SKILLS_DIR
skills_by_name: dict[str, Skill] = {}
diagnostics: list[SkillDiagnostic] = []
if include_user_scope:
# Load lowest precedence first so higher tiers shadow.
for skill_dir in _iter_skill_dirs(AGENTS_SKILLS_DIR):
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
continue
skill, diags = parse_skill_md(skill_md)
diagnostics.extend(diags)
if skill is not None:
skill = _set_scope(skill, "agents")
skills_by_name[skill.name] = skill
for skill_dir in _iter_skill_dirs(USER_SKILLS_DIR):
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
continue
skill, diags = parse_skill_md(skill_md)
diagnostics.extend(diags)
if skill is None:
continue
skill = _set_scope(skill, "user")
existing = skills_by_name.get(skill.name)
if existing is not None and existing.scope == "agents":
diagnostics.append(
SkillDiagnostic(
"warn",
f"user-scope skill '{skill.name}' shadows ~/.agents/skills entry at {existing.location}",
skill.location,
)
)
skills_by_name[skill.name] = skill
for skill_dir in _iter_skill_dirs(project_dir):
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
continue
skill, diags = parse_skill_md(skill_md)
diagnostics.extend(diags)
if skill is None:
continue
skill = _set_scope(skill, "project")
existing = skills_by_name.get(skill.name)
if existing is not None and existing.scope in ("user", "agents"):
diagnostics.append(
SkillDiagnostic(
"warn",
f"project-scope skill '{skill.name}' shadows {existing.scope}-scope skill at {existing.location}",
skill.location,
)
)
skills_by_name[skill.name] = skill
return sorted(skills_by_name.values(), key=lambda s: s.name), diagnostics
def _set_scope(skill: Skill, scope: str) -> Skill:
return Skill(
name=skill.name,
description=skill.description,
location=skill.location,
base_dir=skill.base_dir,
body=skill.body,
scope=scope,
compatibility=skill.compatibility,
license=skill.license,
metadata=skill.metadata,
allowed_tools=skill.allowed_tools,
)
def total_catalog_size(skills: Iterable[Skill]) -> int:
"""Approximate byte size of the catalog the orchestrator will receive."""
return sum(len(s.name) + len(s.description) for s in skills)
def filter_skills_by_allow_list(
skills: Iterable[Skill], allow_list: Iterable[str] | None
) -> list[Skill]:
"""Filter skills to those whose `name` appears in `allow_list`.
If `allow_list` is None, returns all skills. Unknown names are silently
dropped — callers warn at config-validation time.
"""
if allow_list is None:
return list(skills)
allowed = set(allow_list)
return [s for s in skills if s.name in allowed]