"""Agent Skills discovery for Plano. Parses SKILL.md files from .plano/skills/ (project scope) and ~/.plano/skills/ (user scope) following the Agent Skills specification: https://agentskills.io/specification.md The parser is intentionally lenient (per the "Adding skills support" guide): warn on cosmetic issues but only skip a skill when its YAML is unparseable or its required `description` field is missing. """ from __future__ import annotations import json import os import re from dataclasses import dataclass, field from pathlib import Path from typing import Iterable import yaml from planoai.utils import getLogger log = getLogger(__name__) PROJECT_SKILLS_DIR = Path(".plano") / "skills" USER_SKILLS_DIR = Path(os.path.expanduser("~/.plano/skills")) # Universal Agent Skills install location used by `npx skills add` (vercel-labs/add-skill). # Auto-trusted: same security posture as ~/.plano/skills, no project trust needed. AGENTS_SKILLS_DIR = Path(os.path.expanduser("~/.agents/skills")) MAX_CATALOG_BYTES = 5 * 1024 MAX_DIRS_SCANNED = 2000 _NAME_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9-]*[a-z0-9])?$") def trusted_projects_file() -> Path: """Resolve `~/.plano/trusted_projects.json` at call time. Lazy so tests can override $HOME and have the new path picked up; module import time would freeze it to the developer's actual home directory. """ return Path(os.path.expanduser("~/.plano/trusted_projects.json")) def is_project_trusted(project_root: Path) -> bool: """Return True if `project_root` is listed in `~/.plano/trusted_projects.json`. Project-scope skills come from arbitrary repos and are gated on this trust decision (set with `planoai skills trust`). Single source of truth, shared between the `skills_cmd` CLI surface and the render pipeline. """ path = trusted_projects_file() if not path.exists(): return False try: with path.open("r", encoding="utf-8") as fh: data = json.load(fh) except (OSError, json.JSONDecodeError): return False trusted = data.get("trusted_projects", []) if isinstance(data, dict) else [] resolved = str(project_root.resolve()) return resolved in {str(Path(p).resolve()) for p in trusted} @dataclass(frozen=True) class SkillDiagnostic: severity: str # "warn" or "error" message: str path: Path @dataclass class Skill: name: str description: str location: Path base_dir: Path body: str scope: str compatibility: str | None = None license: str | None = None metadata: dict = field(default_factory=dict) allowed_tools: str | None = None def to_dict(self) -> dict: """Serialize to a YAML-friendly dict for embedding in rendered config.""" return { "name": self.name, "description": self.description, "path": str(self.location), "base_dir": str(self.base_dir), "scope": self.scope, "body": self.body, "compatibility": self.compatibility, "license": self.license, "metadata": dict(self.metadata) if self.metadata else None, "allowed_tools": self.allowed_tools, } _MAX_PROJECT_ROOT_WALK_DEPTH = 30 def find_project_root(start: Path | None = None) -> Path: """Walk up from ``start`` looking for ``.plano/``, then ``.git/``. The walk is bounded so a CLI invocation in a deeply-nested or pathological directory does not iterate all the way to ``/`` on every call. Two bounds apply, whichever fires first: * **$HOME**: when ``start`` is inside the user's home directory, the walk stops at ``$HOME`` itself. We never inspect siblings of ``$HOME`` like ``/Users`` — picking up a stray ``.git/`` there would be more surprising than helpful. * **Hard depth cap** (``_MAX_PROJECT_ROOT_WALK_DEPTH`` parents): a defensive fallback for paths outside ``$HOME`` (e.g. ``/tmp/...``) so we still terminate quickly on absurdly deep trees. Falls back to ``start`` (or cwd) if nothing is found. This matches how ``npx skills add`` chooses a project root. """ base = Path(start or Path.cwd()).resolve() try: home = Path(os.path.expanduser("~")).resolve() except (OSError, RuntimeError): home = None def _ancestors(start_dir: Path) -> list[Path]: out: list[Path] = [] cur = start_dir for _ in range(_MAX_PROJECT_ROOT_WALK_DEPTH + 1): out.append(cur) if home is not None and cur == home: break if cur == cur.parent: break cur = cur.parent return out ancestors = _ancestors(base) for cur in ancestors: if (cur / ".plano").exists(): return cur for cur in ancestors: if (cur / ".git").exists(): return cur return base def parse_skill_md(path: Path) -> tuple[Skill | None, list[SkillDiagnostic]]: """Parse a single SKILL.md file leniently.""" diagnostics: list[SkillDiagnostic] = [] try: text = path.read_text(encoding="utf-8") except OSError as exc: diagnostics.append( SkillDiagnostic("error", f"failed to read SKILL.md: {exc}", path) ) return None, diagnostics frontmatter, body = _split_frontmatter(text) if frontmatter is None: diagnostics.append(SkillDiagnostic("error", "missing YAML frontmatter", path)) return None, diagnostics data = _parse_yaml_lenient(frontmatter, path, diagnostics) if data is None: return None, diagnostics description = data.get("description") if not isinstance(description, str) or not description.strip(): diagnostics.append( SkillDiagnostic( "error", "skill is missing required 'description' field", path ) ) return None, diagnostics parent_name = path.parent.name name = data.get("name") if not isinstance(name, str) or not name.strip(): diagnostics.append( SkillDiagnostic( "warn", f"missing 'name' field; falling back to parent directory '{parent_name}'", path, ) ) name = parent_name name = name.strip() if len(name) > 64: diagnostics.append( SkillDiagnostic("warn", "skill name exceeds 64 characters", path) ) if not _NAME_PATTERN.match(name): diagnostics.append( SkillDiagnostic( "warn", f"skill name '{name}' violates spec naming rules " "(lowercase alphanumeric + hyphens, no leading/trailing/double hyphens)", path, ) ) if name != parent_name: diagnostics.append( SkillDiagnostic( "warn", f"skill name '{name}' does not match parent directory '{parent_name}'", path, ) ) metadata_raw = data.get("metadata") metadata = {} if isinstance(metadata_raw, dict): metadata = {str(k): str(v) for k, v in metadata_raw.items()} skill = Skill( name=name, description=description.strip(), location=path.resolve(), base_dir=path.parent.resolve(), body=body, scope="project", # may be overridden by caller compatibility=_string_field(data.get("compatibility")), license=_string_field(data.get("license")), metadata=metadata, allowed_tools=_string_field(data.get("allowed-tools")), ) return skill, diagnostics def _split_frontmatter(text: str) -> tuple[str | None, str]: if not text.startswith("---"): return None, text m = re.match(r"^---\s*\r?\n(.*?)\r?\n---\s*(?:\r?\n)?(.*)$", text, re.DOTALL) if not m: return None, text return m.group(1), m.group(2).strip("\n") def _parse_yaml_lenient( frontmatter: str, path: Path, diagnostics: list[SkillDiagnostic] ) -> dict | None: try: data = yaml.safe_load(frontmatter) except yaml.YAMLError as exc: retried = _retry_quote_problem_fields(frontmatter) if retried is None: diagnostics.append( SkillDiagnostic("error", f"YAML parse error: {exc}", path) ) return None try: data = yaml.safe_load(retried) except yaml.YAMLError as exc2: diagnostics.append( SkillDiagnostic( "error", f"YAML parse error (after retry): {exc2}", path ) ) return None if not isinstance(data, dict): diagnostics.append( SkillDiagnostic("error", "frontmatter is not a YAML mapping", path) ) return None return data _PROBLEM_FIELDS = ("description", "compatibility") def _retry_quote_problem_fields(frontmatter: str) -> str | None: """Wrap unquoted values for fields prone to YAML colon-collisions in quotes.""" lines = frontmatter.splitlines() out: list[str] = [] changed = False for line in lines: m = re.match(r"^(\w[\w-]*)\s*:\s*(.*)$", line) if m and m.group(1) in _PROBLEM_FIELDS: key = m.group(1) value = m.group(2).rstrip() if value and not ( (value.startswith("'") and value.endswith("'")) or (value.startswith('"') and value.endswith('"')) ): escaped = value.replace("\\", "\\\\").replace('"', '\\"') out.append(f'{key}: "{escaped}"') changed = True continue out.append(line) if not changed: return None return "\n".join(out) def _string_field(value) -> str | None: if value is None: return None s = str(value).strip() return s or None def _iter_skill_dirs(root: Path) -> Iterable[Path]: if not root.exists() or not root.is_dir(): return try: children = sorted(root.iterdir(), key=lambda p: p.name) except OSError: return count = 0 for child in children: count += 1 if count > MAX_DIRS_SCANNED: log.warning( "exceeded max scan budget (%d) while looking for skills in %s", MAX_DIRS_SCANNED, root, ) break if not child.is_dir(): continue if child.name.startswith("."): continue yield child def discover_skills( project_root: Path | None = None, include_user_scope: bool = True, ) -> tuple[list[Skill], list[SkillDiagnostic]]: """Discover all skills available to the current project. Precedence (highest first): project > user > agents. Project-scope skills shadow lower tiers with the same name; user-scope shadows agents-scope. Both ``~/.plano/skills/`` (Plano-native) and ``~/.agents/skills/`` (the universal Agent Skills install location used by ``npx skills add``) are treated as auto-trusted user-tier scopes. Returns ``(skills, diagnostics)`` sorted by name. """ project_root = find_project_root(project_root) project_dir = project_root / PROJECT_SKILLS_DIR skills_by_name: dict[str, Skill] = {} diagnostics: list[SkillDiagnostic] = [] if include_user_scope: # Load lowest precedence first so higher tiers shadow. for skill_dir in _iter_skill_dirs(AGENTS_SKILLS_DIR): skill_md = skill_dir / "SKILL.md" if not skill_md.exists(): continue skill, diags = parse_skill_md(skill_md) diagnostics.extend(diags) if skill is not None: skill = _set_scope(skill, "agents") skills_by_name[skill.name] = skill for skill_dir in _iter_skill_dirs(USER_SKILLS_DIR): skill_md = skill_dir / "SKILL.md" if not skill_md.exists(): continue skill, diags = parse_skill_md(skill_md) diagnostics.extend(diags) if skill is None: continue skill = _set_scope(skill, "user") existing = skills_by_name.get(skill.name) if existing is not None and existing.scope == "agents": diagnostics.append( SkillDiagnostic( "warn", f"user-scope skill '{skill.name}' shadows ~/.agents/skills entry at {existing.location}", skill.location, ) ) skills_by_name[skill.name] = skill for skill_dir in _iter_skill_dirs(project_dir): skill_md = skill_dir / "SKILL.md" if not skill_md.exists(): continue skill, diags = parse_skill_md(skill_md) diagnostics.extend(diags) if skill is None: continue skill = _set_scope(skill, "project") existing = skills_by_name.get(skill.name) if existing is not None and existing.scope in ("user", "agents"): diagnostics.append( SkillDiagnostic( "warn", f"project-scope skill '{skill.name}' shadows {existing.scope}-scope skill at {existing.location}", skill.location, ) ) skills_by_name[skill.name] = skill return sorted(skills_by_name.values(), key=lambda s: s.name), diagnostics def _set_scope(skill: Skill, scope: str) -> Skill: return Skill( name=skill.name, description=skill.description, location=skill.location, base_dir=skill.base_dir, body=skill.body, scope=scope, compatibility=skill.compatibility, license=skill.license, metadata=skill.metadata, allowed_tools=skill.allowed_tools, ) def total_catalog_size(skills: Iterable[Skill]) -> int: """Approximate byte size of the catalog the orchestrator will receive.""" return sum(len(s.name) + len(s.description) for s in skills) def filter_skills_by_allow_list( skills: Iterable[Skill], allow_list: Iterable[str] | None ) -> list[Skill]: """Filter skills to those whose `name` appears in `allow_list`. If `allow_list` is None, returns all skills. Unknown names are silently dropped — callers warn at config-validation time. """ if allow_list is None: return list(skills) allowed = set(allow_list) return [s for s in skills if s.name in allowed]