refactor(agents): colocate main_agent-only kernel into main_agent/

Move modules out of agents/shared/ that are consumed by a single package
(main_agent), placing each next to its only consumer instead of in a
"shared" grab-bag:

- agent_cache.py        -> main_agent/runtime/agent_cache_store.py
- connector_searchable_types.py -> main_agent/runtime/
- plugin_loader.py + plugins/    -> main_agent/plugins/
- skills/ + skills_backends.py   -> main_agent/skills/
- tools/invalid_tool.py          -> main_agent/tools/

Drop the skills_backends re-export from the shared middleware barrel and
repoint all consumers + tests. No behavior change; import-all,
error-contract, and the moved tests stay green.
This commit is contained in:
CREDO23 2026-06-04 21:25:39 +02:00
parent c51aca6ccc
commit a7d7155039
24 changed files with 33 additions and 46 deletions

View file

@ -1,356 +0,0 @@
"""TTL-LRU cache for compiled SurfSense deep agents.
Why this exists
---------------
``create_surfsense_deep_agent`` runs a 4-5 second pipeline on EVERY chat
turn:
1. Discover connectors & document types from Postgres (~50-200ms)
2. Build the tool list (built-in + MCP) (~200ms-1.7s)
3. Compose the system prompt
4. Construct ~15 middleware instances (CPU)
5. Eagerly compile the general-purpose subagent
(``SubAgentMiddleware.__init__`` calls ``create_agent`` synchronously,
which builds a second LangGraph + Pydantic schemas ~1.5-2s of pure
CPU work)
6. Compile the outer LangGraph
For a single thread, all six steps produce the SAME object on every turn
unless the user has changed their LLM config, toggled a feature flag,
added a connector, etc. The right answer is to compile ONCE per
"agent shape" and reuse the resulting :class:`CompiledStateGraph` for
every subsequent turn on the same thread.
Why a per-thread key (not a global pool)
----------------------------------------
Most middleware in the SurfSense stack captures per-thread state in
``__init__`` closures (``thread_id``, ``user_id``, ``search_space_id``,
``filesystem_mode``, ``mentioned_document_ids``). Cross-thread reuse
would silently leak state across users and threads. Keying the cache on
``(llm_config_id, thread_id, ...)`` gives us safe reuse for repeated
turns on the same thread without changing any middleware's behavior.
Phase 2 will move those captured fields onto :class:`SurfSenseContextSchema`
(read via ``runtime.context``) so the cache can collapse to a single
``(llm_config_id, search_space_id, ...)`` key shared across threads. Until
then, per-thread keying is the only safe option.
Cache shape
-----------
* TTL-LRU: entries auto-expire after ``ttl_seconds`` (default 1800s, 30
minutes matches a typical chat session). ``maxsize`` (default 256)
caps memory; LRU evicts least-recently-used on overflow.
* In-flight de-duplication: per-key :class:`asyncio.Lock` so concurrent
cold misses on the same key wait for the first build instead of
building N times.
* Process-local: this is an in-memory cache. Multi-replica deployments
pay the build cost once per replica per key. That's fine; the working
set per replica is small (one entry per active thread on that replica).
Telemetry
---------
Every lookup logs ``[agent_cache]`` lines through ``surfsense.perf``:
* ``hit`` cache hit, microseconds-fast
* ``miss`` first build for this key, includes build duration
* ``stale`` entry was found but expired; rebuilt
* ``evict`` LRU eviction (size-limited)
* ``size`` current cache occupancy at lookup time
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import os
import time
from collections import OrderedDict
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any
from app.utils.perf import get_perf_logger
logger = logging.getLogger(__name__)
_perf_log = get_perf_logger()
# ---------------------------------------------------------------------------
# Public API: signature helpers (cache key components)
# ---------------------------------------------------------------------------
def stable_hash(*parts: Any) -> str:
"""Compute a deterministic SHA1 of the str repr of ``parts``.
Used for cache key components that need a fixed-width representation
(system prompt, tool list, etc.). SHA1 is fine here this is not a
security boundary, just a content fingerprint.
"""
h = hashlib.sha1(usedforsecurity=False)
for p in parts:
h.update(repr(p).encode("utf-8", errors="replace"))
h.update(b"\x1f") # ASCII unit separator between parts
return h.hexdigest()
def tools_signature(
tools: list[Any] | tuple[Any, ...],
*,
available_connectors: list[str] | None,
available_document_types: list[str] | None,
) -> str:
"""Hash the bound-tool surface for cache-key purposes.
The signature changes whenever:
* A tool is added or removed from the bound list (built-in toggles,
MCP tools loaded for the user changes, gating rules flip, etc.).
* The available connectors / document types for the search space
change (new connector added, last connector removed, new document
type indexed). Connector gating derives disabled tools from
``available_connectors``, so the tool surface is technically already
covered but we hash the connector list separately so an empty-list
"no tools changed" situation still rotates the key when, say, the user
re-adds a connector that gates a tool we were already not exposing.
Stays stable across:
* Process restarts (tool names + descriptions are static).
* Different replicas (everyone gets the same hash for the same
inputs).
"""
tool_descriptors = sorted(
(getattr(t, "name", repr(t)), getattr(t, "description", "")) for t in tools
)
connectors = sorted(available_connectors or [])
doc_types = sorted(available_document_types or [])
return stable_hash(tool_descriptors, connectors, doc_types)
def flags_signature(flags: Any) -> str:
"""Hash the resolved :class:`AgentFeatureFlags` dataclass.
Frozen dataclasses are deterministically reprable, so a SHA1 of their
repr is a stable fingerprint. Restart safe (flags are read once at
process boot).
"""
return stable_hash(repr(flags))
def system_prompt_hash(system_prompt: str) -> str:
"""Hash a system prompt string. Cheap, ~30µs for typical prompts."""
return hashlib.sha1(
system_prompt.encode("utf-8", errors="replace"),
usedforsecurity=False,
).hexdigest()
# ---------------------------------------------------------------------------
# Cache implementation
# ---------------------------------------------------------------------------
@dataclass
class _Entry:
value: Any
created_at: float
last_used_at: float
class _AgentCache:
"""In-process TTL-LRU cache with per-key in-flight de-duplication.
NOT THREAD-SAFE in the multithreading sense designed for a single
asyncio event loop. Uvicorn runs one event loop per worker process,
so this is fine; multi-worker deployments simply each maintain their
own cache.
"""
def __init__(self, *, maxsize: int, ttl_seconds: float) -> None:
self._maxsize = maxsize
self._ttl = ttl_seconds
self._entries: OrderedDict[str, _Entry] = OrderedDict()
# One lock per key — guards "build" so concurrent cold misses on
# the same key wait for the first build instead of all racing.
self._locks: dict[str, asyncio.Lock] = {}
def _now(self) -> float:
return time.monotonic()
def _is_fresh(self, entry: _Entry) -> bool:
return (self._now() - entry.created_at) < self._ttl
def _evict_if_full(self) -> None:
while len(self._entries) >= self._maxsize:
evicted_key, _ = self._entries.popitem(last=False)
self._locks.pop(evicted_key, None)
_perf_log.info(
"[agent_cache] evict key=%s reason=lru size=%d",
_short(evicted_key),
len(self._entries),
)
def _touch(self, key: str, entry: _Entry) -> None:
entry.last_used_at = self._now()
self._entries.move_to_end(key, last=True)
async def get_or_build(
self,
key: str,
*,
builder: Callable[[], Awaitable[Any]],
) -> Any:
"""Return the cached value for ``key`` or call ``builder()`` to make it.
``builder`` MUST be idempotent concurrent cold misses on the
same key collapse to a single ``builder()`` call (the others
wait on the in-flight lock and observe the populated entry on
wake).
"""
# Fast path: hot hit.
entry = self._entries.get(key)
if entry is not None and self._is_fresh(entry):
self._touch(key, entry)
_perf_log.info(
"[agent_cache] hit key=%s age=%.1fs size=%d",
_short(key),
self._now() - entry.created_at,
len(self._entries),
)
return entry.value
# Stale entry — drop it; rebuild below.
if entry is not None and not self._is_fresh(entry):
_perf_log.info(
"[agent_cache] stale key=%s age=%.1fs ttl=%.0fs",
_short(key),
self._now() - entry.created_at,
self._ttl,
)
self._entries.pop(key, None)
# Slow path: serialize concurrent misses for the same key.
lock = self._locks.setdefault(key, asyncio.Lock())
async with lock:
# Double-check after acquiring the lock — another waiter may
# have populated the entry while we slept.
entry = self._entries.get(key)
if entry is not None and self._is_fresh(entry):
self._touch(key, entry)
_perf_log.info(
"[agent_cache] hit key=%s age=%.1fs size=%d coalesced=true",
_short(key),
self._now() - entry.created_at,
len(self._entries),
)
return entry.value
t0 = time.perf_counter()
try:
value = await builder()
except BaseException:
# Don't cache failed builds; let the next caller retry.
_perf_log.warning(
"[agent_cache] build_failed key=%s elapsed=%.3fs",
_short(key),
time.perf_counter() - t0,
)
raise
elapsed = time.perf_counter() - t0
# Insert + evict.
self._evict_if_full()
now = self._now()
self._entries[key] = _Entry(value=value, created_at=now, last_used_at=now)
self._entries.move_to_end(key, last=True)
_perf_log.info(
"[agent_cache] miss key=%s build=%.3fs size=%d",
_short(key),
elapsed,
len(self._entries),
)
return value
def invalidate(self, key: str) -> bool:
"""Drop a single entry; return True if anything was removed."""
removed = self._entries.pop(key, None) is not None
self._locks.pop(key, None)
if removed:
_perf_log.info(
"[agent_cache] invalidate key=%s size=%d",
_short(key),
len(self._entries),
)
return removed
def invalidate_prefix(self, prefix: str) -> int:
"""Drop every entry whose key starts with ``prefix``. Returns count."""
keys = [k for k in self._entries if k.startswith(prefix)]
for k in keys:
self._entries.pop(k, None)
self._locks.pop(k, None)
if keys:
_perf_log.info(
"[agent_cache] invalidate_prefix prefix=%s removed=%d size=%d",
_short(prefix),
len(keys),
len(self._entries),
)
return len(keys)
def clear(self) -> None:
n = len(self._entries)
self._entries.clear()
self._locks.clear()
if n:
_perf_log.info("[agent_cache] clear removed=%d", n)
def stats(self) -> dict[str, Any]:
return {
"size": len(self._entries),
"maxsize": self._maxsize,
"ttl_seconds": self._ttl,
}
def _short(key: str, n: int = 16) -> str:
"""Truncate keys for log lines so they don't blow up log volume."""
return key if len(key) <= n else f"{key[:n]}..."
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_DEFAULT_MAXSIZE = int(os.getenv("SURFSENSE_AGENT_CACHE_MAXSIZE", "256"))
_DEFAULT_TTL = float(os.getenv("SURFSENSE_AGENT_CACHE_TTL_SECONDS", "1800"))
_cache: _AgentCache = _AgentCache(maxsize=_DEFAULT_MAXSIZE, ttl_seconds=_DEFAULT_TTL)
def get_cache() -> _AgentCache:
"""Return the process-wide compiled-agent cache singleton."""
return _cache
def reload_for_tests(*, maxsize: int = 256, ttl_seconds: float = 1800.0) -> _AgentCache:
"""Replace the singleton with a fresh cache. Tests only."""
global _cache
_cache = _AgentCache(maxsize=maxsize, ttl_seconds=ttl_seconds)
return _cache
__all__ = [
"flags_signature",
"get_cache",
"reload_for_tests",
"stable_hash",
"system_prompt_hash",
"tools_signature",
]

View file

@ -1,100 +0,0 @@
"""Map configured connectors to the searchable document/connector types.
This is agent-agnostic infrastructure shared by every agent factory (single-
and multi-agent). It translates the connectors a search space has enabled into
the set of searchable type strings that pre-search middleware and ``web_search``
understand, and always layers in the document types that exist independently of
any connector (uploads, notes, extension captures, YouTube).
It lives in its own module rather than inside a specific agent factory so
that retiring or moving any single agent never disturbs the others' access to
this mapping.
"""
from __future__ import annotations
from typing import Any
# Maps SearchSourceConnectorType enum values to the searchable document/connector types
# used by pre-search middleware and web_search.
# Live search connectors (TAVILY_API, LINKUP_API, BAIDU_SEARCH_API) are routed to
# the web_search tool; all others are considered local/indexed data.
_CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = {
# Live search connectors (handled by web_search tool)
"TAVILY_API": "TAVILY_API",
"LINKUP_API": "LINKUP_API",
"BAIDU_SEARCH_API": "BAIDU_SEARCH_API",
# Local/indexed connectors (handled by KB pre-search middleware)
"SLACK_CONNECTOR": "SLACK_CONNECTOR",
"TEAMS_CONNECTOR": "TEAMS_CONNECTOR",
"NOTION_CONNECTOR": "NOTION_CONNECTOR",
"GITHUB_CONNECTOR": "GITHUB_CONNECTOR",
"LINEAR_CONNECTOR": "LINEAR_CONNECTOR",
"DISCORD_CONNECTOR": "DISCORD_CONNECTOR",
"JIRA_CONNECTOR": "JIRA_CONNECTOR",
"CONFLUENCE_CONNECTOR": "CONFLUENCE_CONNECTOR",
"CLICKUP_CONNECTOR": "CLICKUP_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR": "GOOGLE_CALENDAR_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR": "GOOGLE_GMAIL_CONNECTOR",
"GOOGLE_DRIVE_CONNECTOR": "GOOGLE_DRIVE_FILE", # Connector type differs from document type
"AIRTABLE_CONNECTOR": "AIRTABLE_CONNECTOR",
"LUMA_CONNECTOR": "LUMA_CONNECTOR",
"ELASTICSEARCH_CONNECTOR": "ELASTICSEARCH_CONNECTOR",
"WEBCRAWLER_CONNECTOR": "CRAWLED_URL", # Maps to document type
"BOOKSTACK_CONNECTOR": "BOOKSTACK_CONNECTOR",
"CIRCLEBACK_CONNECTOR": "CIRCLEBACK", # Connector type differs from document type
"OBSIDIAN_CONNECTOR": "OBSIDIAN_CONNECTOR",
"DROPBOX_CONNECTOR": "DROPBOX_FILE", # Connector type differs from document type
"ONEDRIVE_CONNECTOR": "ONEDRIVE_FILE", # Connector type differs from document type
# Composio connectors (unified to native document types).
# Reverse of NATIVE_TO_LEGACY_DOCTYPE in app.db.
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "GOOGLE_DRIVE_FILE",
"COMPOSIO_GMAIL_CONNECTOR": "GOOGLE_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "GOOGLE_CALENDAR_CONNECTOR",
}
# Document types that don't come from SearchSourceConnector but should always be searchable
_ALWAYS_AVAILABLE_DOC_TYPES: list[str] = [
"EXTENSION", # Browser extension data
"FILE", # Uploaded files
"NOTE", # User notes
"YOUTUBE_VIDEO", # YouTube videos
]
def map_connectors_to_searchable_types(
connector_types: list[Any],
) -> list[str]:
"""
Map SearchSourceConnectorType enums to searchable document/connector types.
This function:
1. Converts connector type enums to their searchable counterparts
2. Includes always-available document types (EXTENSION, FILE, NOTE, YOUTUBE_VIDEO)
3. Deduplicates while preserving order
Args:
connector_types: List of SearchSourceConnectorType enum values
Returns:
List of searchable connector/document type strings
"""
result_set: set[str] = set()
result_list: list[str] = []
# Add always-available document types first
for doc_type in _ALWAYS_AVAILABLE_DOC_TYPES:
if doc_type not in result_set:
result_set.add(doc_type)
result_list.append(doc_type)
# Map each connector type to its searchable equivalent
for ct in connector_types:
# Handle both enum and string types
ct_str = ct.value if hasattr(ct, "value") else str(ct)
searchable = _CONNECTOR_TYPE_TO_SEARCHABLE.get(ct_str)
if searchable and searchable not in result_set:
result_set.add(searchable)
result_list.append(searchable)
return result_list

View file

@ -45,12 +45,6 @@ from app.agents.shared.middleware.noop_injection import NoopInjectionMiddleware
from app.agents.shared.middleware.otel_span import OtelSpanMiddleware
from app.agents.shared.middleware.permission import PermissionMiddleware
from app.agents.shared.middleware.retry_after import RetryAfterMiddleware
from app.agents.shared.middleware.skills_backends import (
BuiltinSkillsBackend,
SearchSpaceSkillsBackend,
build_skills_backend_factory,
default_skills_sources,
)
from app.agents.shared.middleware.tool_call_repair import (
ToolCallNameRepairMiddleware,
)
@ -58,7 +52,6 @@ from app.agents.shared.middleware.tool_call_repair import (
__all__ = [
"ActionLogMiddleware",
"AnonymousDocumentMiddleware",
"BuiltinSkillsBackend",
"BusyMutexMiddleware",
"ClearToolUsesEdit",
"DedupHITLToolCallsMiddleware",
@ -74,14 +67,11 @@ __all__ = [
"OtelSpanMiddleware",
"PermissionMiddleware",
"RetryAfterMiddleware",
"SearchSpaceSkillsBackend",
"SpillToBackendEdit",
"SpillingContextEditingMiddleware",
"SurfSenseCompactionMiddleware",
"ToolCallNameRepairMiddleware",
"ToolDefinition",
"build_skills_backend_factory",
"commit_staged_filesystem_state",
"create_surfsense_compaction_middleware",
"default_skills_sources",
]

View file

@ -1,340 +0,0 @@
"""Skills backends for SurfSense.
Implements two minimal :class:`deepagents.backends.protocol.BackendProtocol`
subclasses tailored for use with :class:`deepagents.middleware.skills.SkillsMiddleware`.
The middleware only needs four methods to load skills from a backend:
* ``ls_info`` / ``als_info`` list directories under a source path.
* ``download_files`` / ``adownload_files`` fetch ``SKILL.md`` bytes.
Other ``BackendProtocol`` methods (``read``/``write``/``edit``/``grep_raw`` )
default to ``NotImplementedError`` from the base class. They are never reached
by the skills middleware because skill content is rendered into the system
prompt at agent build time, not edited at runtime.
Two backends are provided:
* :class:`BuiltinSkillsBackend` disk-backed read of bundled skills from
``app/agents/shared/skills/builtin/``.
* :class:`SearchSpaceSkillsBackend` a thin read-only wrapper over
:class:`KBPostgresBackend` that filters notes under the privileged folder
``/documents/_skills/``.
Both backends are intentionally read-only: skill authoring happens out of band
(via filesystem or a search-space-admin route), so we never expose
``write`` / ``edit`` / ``upload_files``. The base class' ``NotImplementedError``
gives a clean failure mode if anything tries.
"""
from __future__ import annotations
import contextlib
import logging
from collections.abc import Callable
from dataclasses import replace
from pathlib import Path
from typing import TYPE_CHECKING
from deepagents.backends.composite import CompositeBackend
from deepagents.backends.protocol import (
BackendProtocol,
FileDownloadResponse,
FileInfo,
)
from deepagents.backends.state import StateBackend
if TYPE_CHECKING:
from langchain.tools import ToolRuntime
from app.agents.shared.middleware.kb_postgres_backend import KBPostgresBackend
logger = logging.getLogger(__name__)
# Limit per Agent Skills spec; matches deepagents.middleware.skills.MAX_SKILL_FILE_SIZE.
_MAX_SKILL_FILE_SIZE = 10 * 1024 * 1024
def _default_builtin_root() -> Path:
"""Return the absolute path to the bundled builtin skills directory.
Located at ``app/agents/shared/skills/builtin/`` relative to this module
(this module lives at ``app/agents/shared/middleware/skills_backends.py``).
"""
return (Path(__file__).resolve().parent.parent / "skills" / "builtin").resolve()
class BuiltinSkillsBackend(BackendProtocol):
"""Read-only disk-backed skills source.
Maps a virtual ``/skills/builtin/`` namespace onto a directory on local disk,
where each skill is its own subdirectory containing a ``SKILL.md`` file::
<root>/<skill-name>/SKILL.md
The middleware calls :meth:`als_info` with the source path and expects a
``list[FileInfo]`` whose ``is_dir=True`` entries are descended into. Then it
calls :meth:`adownload_files` with the synthesized ``SKILL.md`` paths and
parses YAML frontmatter from the returned ``content`` bytes.
Mounting under :class:`~deepagents.backends.composite.CompositeBackend` at
prefix ``/skills/builtin/`` means the middleware can issue paths like
``/skills/builtin/kb-research/SKILL.md`` which the composite strips down to
``/kb-research/SKILL.md`` before forwarding here. We treat any leading
slash as anchoring at :attr:`root`.
"""
def __init__(self, root: Path | str | None = None) -> None:
self.root: Path = Path(root).resolve() if root else _default_builtin_root()
if not self.root.exists():
logger.info(
"BuiltinSkillsBackend root %s does not exist; skills will be empty.",
self.root,
)
def _resolve(self, path: str) -> Path:
"""Resolve a virtual posix path under :attr:`root`, refusing escapes."""
bare = path.lstrip("/")
candidate = (self.root / bare).resolve() if bare else self.root
# Refuse symlink/.. traversal that escapes the root.
try:
candidate.relative_to(self.root)
except ValueError as exc:
raise ValueError(f"path {path!r} escapes builtin skills root") from exc
return candidate
def ls_info(self, path: str) -> list[FileInfo]:
try:
target = self._resolve(path)
except ValueError as exc:
logger.warning("BuiltinSkillsBackend.ls_info refused: %s", exc)
return []
if not target.exists() or not target.is_dir():
return []
infos: list[FileInfo] = []
# Build virtual paths anchored at "/" because CompositeBackend already
# stripped the route prefix before calling us.
target_virtual = (
"/"
if target == self.root
else ("/" + str(target.relative_to(self.root)).replace("\\", "/"))
)
for child in sorted(target.iterdir()):
if child.name == "__pycache__" or child.name.startswith("."):
continue
child_virtual = (
target_virtual.rstrip("/") + "/" + child.name
if target_virtual != "/"
else "/" + child.name
)
info: FileInfo = {
"path": child_virtual,
"is_dir": child.is_dir(),
}
if child.is_file():
with contextlib.suppress(OSError): # pragma: no cover - defensive
info["size"] = child.stat().st_size
infos.append(info)
return infos
def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
responses: list[FileDownloadResponse] = []
for p in paths:
try:
target = self._resolve(p)
except ValueError:
responses.append(FileDownloadResponse(path=p, error="invalid_path"))
continue
if not target.exists():
responses.append(FileDownloadResponse(path=p, error="file_not_found"))
continue
if target.is_dir():
responses.append(FileDownloadResponse(path=p, error="is_directory"))
continue
try:
# Hard cap to avoid loading rogue mega-files into memory.
size = target.stat().st_size
if size > _MAX_SKILL_FILE_SIZE:
logger.warning(
"Builtin skill file %s exceeds %d bytes; truncating.",
target,
_MAX_SKILL_FILE_SIZE,
)
with target.open("rb") as fh:
content = fh.read(_MAX_SKILL_FILE_SIZE)
else:
content = target.read_bytes()
except PermissionError:
responses.append(
FileDownloadResponse(path=p, error="permission_denied")
)
continue
except OSError as exc: # pragma: no cover - defensive
logger.warning("Builtin skill read failed %s: %s", target, exc)
responses.append(FileDownloadResponse(path=p, error="file_not_found"))
continue
responses.append(FileDownloadResponse(path=p, content=content, error=None))
return responses
class SearchSpaceSkillsBackend(BackendProtocol):
"""Read-only view of search-space-authored skills.
Wraps a :class:`KBPostgresBackend` and only ever reads under the privileged
folder ``/documents/_skills/`` (configurable). The folder is intended to be
writable only by search-space admins; this backend never writes.
The skills middleware expects a layout like::
/<source_root>/<skill-name>/SKILL.md
But the KB stores documents like ``/documents/_skills/<name>/SKILL.md``.
We expose the inner namespace by remapping each path. When mounted under
:class:`CompositeBackend` at prefix ``/skills/space/`` the paths the
middleware sees become ``/skills/space/<name>/SKILL.md``; the composite
strips ``/skills/space/`` and hands us ``/<name>/SKILL.md``, which we
rewrite to ``/documents/_skills/<name>/SKILL.md`` before forwarding to the
KB.
No new database table is needed: the privileged folder convention is
enforced server-side outside of this class. We intentionally swallow any
write/edit attempts (the base class raises ``NotImplementedError``).
"""
DEFAULT_KB_ROOT: str = "/documents/_skills"
def __init__(
self,
kb_backend: KBPostgresBackend,
*,
kb_root: str = DEFAULT_KB_ROOT,
) -> None:
self._kb = kb_backend
# Normalize trailing slash off so we can join cleanly.
self._kb_root = kb_root.rstrip("/") or "/"
def _to_kb(self, path: str) -> str:
"""Rewrite a virtual path into the underlying KB namespace."""
bare = path.lstrip("/")
if not bare:
return self._kb_root
return f"{self._kb_root}/{bare}"
def _from_kb(self, kb_path: str) -> str:
"""Rewrite a KB path back into our virtual namespace."""
if not kb_path.startswith(self._kb_root):
return kb_path # pragma: no cover - defensive
rel = kb_path[len(self._kb_root) :]
return rel if rel.startswith("/") else "/" + rel
def ls_info(self, path: str) -> list[FileInfo]:
# KBPostgresBackend exposes only the async API meaningfully; the sync
# path falls back to ``asyncio.to_thread(...)`` in the base class. We
# keep this stub to satisfy abstract resolution; the middleware calls
# ``als_info``.
raise NotImplementedError("SearchSpaceSkillsBackend is async-only")
async def als_info(self, path: str) -> list[FileInfo]:
kb_path = self._to_kb(path)
try:
infos = await self._kb.als_info(kb_path)
except Exception as exc: # pragma: no cover - defensive
logger.warning("SearchSpaceSkillsBackend.als_info failed: %s", exc)
return []
remapped: list[FileInfo] = []
for info in infos:
kb_p = info.get("path", "")
if not kb_p.startswith(self._kb_root):
continue
remapped.append({**info, "path": self._from_kb(kb_p)})
return remapped
def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
raise NotImplementedError("SearchSpaceSkillsBackend is async-only")
async def adownload_files(self, paths: list[str]) -> list[FileDownloadResponse]:
kb_paths = [self._to_kb(p) for p in paths]
responses = await self._kb.adownload_files(kb_paths)
# Re-map response paths back to the virtual namespace so the middleware
# correlates them to the input list correctly.
remapped: list[FileDownloadResponse] = []
for original, resp in zip(paths, responses, strict=True):
remapped.append(replace(resp, path=original))
return remapped
SKILLS_BUILTIN_PREFIX = "/skills/builtin/"
SKILLS_SPACE_PREFIX = "/skills/space/"
def build_skills_backend_factory(
*,
builtin_root: Path | str | None = None,
search_space_id: int | None = None,
) -> Callable[[ToolRuntime], BackendProtocol]:
"""Return a runtime-aware factory for the skills :class:`CompositeBackend`.
When ``search_space_id`` is provided the composite includes a
:class:`SearchSpaceSkillsBackend` route at ``/skills/space/`` over a fresh
per-runtime :class:`KBPostgresBackend`, mirroring how
:func:`build_backend_resolver` constructs the main filesystem backend.
When ``search_space_id`` is ``None`` (e.g., desktop-local mode or unit
tests) only the bundled :class:`BuiltinSkillsBackend` is exposed.
Returning a factory rather than a fixed instance is intentional: the
underlying KB backend depends on per-call ``ToolRuntime`` state
(``staged_dirs``, ``files`` cache, runtime config), so a single shared
instance cannot serve multiple concurrent agent runs.
"""
builtin = BuiltinSkillsBackend(builtin_root)
if search_space_id is None:
def _factory_builtin_only(runtime: ToolRuntime) -> BackendProtocol:
# Default StateBackend is intentionally inert: any path outside the
# ``/skills/builtin/`` route resolves to an empty per-runtime state
# so the SkillsMiddleware can iterate sources without raising.
return CompositeBackend(
default=StateBackend(runtime),
routes={SKILLS_BUILTIN_PREFIX: builtin},
)
return _factory_builtin_only
def _factory_with_space(runtime: ToolRuntime) -> BackendProtocol:
# Imported lazily to avoid a hard dependency at module import time:
# ``KBPostgresBackend`` pulls in DB models, which are unnecessary for
# the unit-tested builtin path.
from app.agents.shared.middleware.kb_postgres_backend import (
KBPostgresBackend,
)
kb = KBPostgresBackend(search_space_id, runtime)
space = SearchSpaceSkillsBackend(kb)
return CompositeBackend(
default=StateBackend(runtime),
routes={
SKILLS_BUILTIN_PREFIX: builtin,
SKILLS_SPACE_PREFIX: space,
},
)
return _factory_with_space
def default_skills_sources() -> list[str]:
"""Return the canonical source list for SkillsMiddleware (built-in then space)."""
return [SKILLS_BUILTIN_PREFIX, SKILLS_SPACE_PREFIX]
__all__ = [
"SKILLS_BUILTIN_PREFIX",
"SKILLS_SPACE_PREFIX",
"BuiltinSkillsBackend",
"SearchSpaceSkillsBackend",
"build_skills_backend_factory",
"default_skills_sources",
]

View file

@ -120,7 +120,9 @@ class ToolCallNameRepairMiddleware(
# Stage 2 — invalid fallback
# Local import keeps the middleware module import-light and avoids any
# tools <-> middleware import-order coupling at module scope.
from app.agents.shared.tools.invalid_tool import INVALID_TOOL_NAME
from app.agents.multi_agent_chat.main_agent.tools.invalid_tool import (
INVALID_TOOL_NAME,
)
if INVALID_TOOL_NAME in registered:
original_args = call.get("args") or {}

View file

@ -1,158 +0,0 @@
"""Entry-point based plugin loader for SurfSense agent middleware.
LangChain's :class:`AgentMiddleware` ABC already covers the practical
surface most plugins need (``before_agent`` / ``before_model`` /
``wrap_tool_call`` / their async counterparts), so a SurfSense-specific
plugin protocol would be redundant. We just need a way to discover and
admit third-party middleware safely.
A plugin is therefore just an installable Python package that registers a
factory callable under the ``surfsense.plugins`` entry-point group:
.. code-block:: toml
# in a plugin package's pyproject.toml
[project.entry-points."surfsense.plugins"]
year_substituter = "my_plugin:make_middleware"
The factory has the signature ``Callable[[PluginContext], AgentMiddleware]``.
It receives a small, sanitized :class:`PluginContext` with the IDs and the
LLM the plugin is allowed to talk to and **never** raw secrets, DB
sessions, or other connectors.
## Trust model
Plugins are loaded **only if** their entry-point ``name`` appears in
``allowed_plugins`` (admin-controlled, sourced from
``global_llm_config.yaml`` or :func:`load_allowed_plugin_names_from_env`).
There is **no env-driven auto-load**. A plugin failure is logged and
isolated; it does not break agent construction.
"""
from __future__ import annotations
import logging
import os
from collections.abc import Iterable
from importlib.metadata import entry_points
from typing import TYPE_CHECKING
from langchain.agents.middleware import AgentMiddleware
if TYPE_CHECKING: # pragma: no cover - type-only
from langchain_core.language_models import BaseChatModel
from app.db import ChatVisibility
logger = logging.getLogger(__name__)
PLUGIN_ENTRY_POINT_GROUP = "surfsense.plugins"
class PluginContext(dict):
"""Sanitized DI bag handed to each plugin factory.
Backed by ``dict`` so plugins can inspect the keys they care about
without coupling to a concrete dataclass shape. Required keys:
* ``search_space_id`` (int)
* ``user_id`` (str | None)
* ``thread_visibility`` (:class:`app.db.ChatVisibility`)
* ``llm`` (:class:`langchain_core.language_models.BaseChatModel`)
The context **never** carries DB sessions, raw secrets, or other
connectors. If a future plugin genuinely needs DB access, that
integration goes through a rate-limited service interface, not
through this bag.
"""
@classmethod
def build(
cls,
*,
search_space_id: int,
user_id: str | None,
thread_visibility: ChatVisibility,
llm: BaseChatModel,
) -> PluginContext:
return cls(
search_space_id=search_space_id,
user_id=user_id,
thread_visibility=thread_visibility,
llm=llm,
)
def load_plugin_middlewares(
ctx: PluginContext,
allowed_plugin_names: Iterable[str],
) -> list[AgentMiddleware]:
"""Discover, allowlist-filter, and instantiate plugin middleware.
For each entry-point in :data:`PLUGIN_ENTRY_POINT_GROUP` whose name is
in ``allowed_plugin_names``, load the factory and call it with ``ctx``.
The factory's return value must be an :class:`AgentMiddleware` instance;
anything else is logged and skipped.
Errors are isolated a plugin that raises during ``ep.load()`` or
factory invocation is logged at ``ERROR`` and ignored. Agent
construction continues with whatever plugins did succeed.
"""
allowed = {name for name in allowed_plugin_names if name}
if not allowed:
return []
out: list[AgentMiddleware] = []
try:
eps = entry_points(group=PLUGIN_ENTRY_POINT_GROUP)
except Exception: # pragma: no cover - defensive (entry_points is robust)
logger.exception("Failed to enumerate plugin entry points")
return []
for ep in eps:
if ep.name not in allowed:
logger.info("Skipping non-allowlisted plugin %s", ep.name)
continue
try:
factory = ep.load()
except Exception:
logger.exception("Failed to load plugin %s", ep.name)
continue
try:
mw = factory(ctx)
except Exception:
logger.exception("Plugin %s factory raised", ep.name)
continue
if not isinstance(mw, AgentMiddleware):
logger.warning(
"Plugin %s returned %s, expected AgentMiddleware; skipping",
ep.name,
type(mw).__name__,
)
continue
out.append(mw)
logger.info("Loaded plugin %s as %s", ep.name, type(mw).__name__)
return out
def load_allowed_plugin_names_from_env() -> set[str]:
"""Read ``SURFSENSE_ALLOWED_PLUGINS`` (comma-separated) into a set.
Provided as a thin convenience for deployments that don't surface plugins
through ``global_llm_config.yaml`` yet. Whitespace is stripped and empty
entries are dropped.
"""
raw = os.environ.get("SURFSENSE_ALLOWED_PLUGINS", "").strip()
if not raw:
return set()
return {token.strip() for token in raw.split(",") if token.strip()}
__all__ = [
"PLUGIN_ENTRY_POINT_GROUP",
"PluginContext",
"load_allowed_plugin_names_from_env",
"load_plugin_middlewares",
]

View file

@ -1,6 +0,0 @@
"""Reference plugins bundled with SurfSense.
These plugins are intentionally small and demonstrative. They are NOT
auto-loaded they ship as examples that a deployment can opt into via
``global_llm_config.yaml`` or ``SURFSENSE_ALLOWED_PLUGINS``.
"""

View file

@ -1,88 +0,0 @@
"""Reference plugin: substitute ``{{year}}`` in tool descriptions.
Demonstrates the :meth:`AgentMiddleware.awrap_tool_call` hook -- the
plugin sees every tool invocation and can rewrite the request *or* the
result. This particular plugin is read-only and only transforms the
*description* the user might see in error messages (no request
mutation).
The plugin is built as a factory function so the entry-point loader can
inject :class:`PluginContext` (containing the agent's LLM, search-space
ID, etc.). The factory signature
``Callable[[PluginContext], AgentMiddleware]`` is the only contract --
SurfSense doesn't define a custom plugin protocol on top of LangChain's
:class:`AgentMiddleware`.
Wire-up in ``pyproject.toml`` (illustrative; the in-repo plugin doesn't
need this -- it's already on the import path)::
[project.entry-points."surfsense.plugins"]
year_substituter = "app.agents.shared.plugins.year_substituter:make_middleware"
"""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from langchain.agents.middleware import AgentMiddleware
if TYPE_CHECKING: # pragma: no cover - type-only
from langchain.agents.middleware.types import ToolCallRequest
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from app.agents.shared.plugin_loader import PluginContext
logger = logging.getLogger(__name__)
class _YearSubstituterMiddleware(AgentMiddleware):
"""Replace ``{{year}}`` in the result text with the current UTC year."""
tools = ()
def __init__(self, year: int | None = None) -> None:
super().__init__()
self._year = str(year if year is not None else datetime.now(UTC).year)
async def awrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command[Any]]],
) -> ToolMessage | Command[Any]:
result = await handler(request)
try:
from langchain_core.messages import ToolMessage
if (
isinstance(result, ToolMessage)
and isinstance(result.content, str)
and "{{year}}" in result.content
):
new_text = result.content.replace("{{year}}", self._year)
result = ToolMessage(
content=new_text,
tool_call_id=result.tool_call_id,
id=result.id,
name=result.name,
status=result.status,
artifact=result.artifact,
)
except Exception: # pragma: no cover - defensive
logger.exception("year_substituter plugin failed; passing original result")
return result
def make_middleware(ctx: PluginContext) -> AgentMiddleware:
"""Plugin factory used by :func:`load_plugin_middlewares`."""
# Plugin is intentionally small so it has no state to threading-protect
# and ignores ``ctx`` beyond demonstrating that the loader passes it in.
_ = ctx
return _YearSubstituterMiddleware()
__all__ = ["make_middleware"]

View file

@ -1,7 +0,0 @@
"""SurfSense built-in agent skills (Anthropic Skills format).
Each subdirectory corresponds to one skill and contains a ``SKILL.md`` file
with YAML frontmatter (name, description, allowed_tools) plus markdown
instructions. The :class:`BuiltinSkillsBackend` exposes them to the
deepagents :class:`SkillsMiddleware`.
"""

View file

@ -1,24 +0,0 @@
---
name: email-drafting
description: Draft an email matching the user's voice, with structured intent and CTA
---
# Email drafting
## When to use this skill
"Draft an email to ...", "reply to this thread", "write a follow-up to X". Plain "summarize the email" is **not** in scope — that's a comprehension task.
## Voice
Search the KB for prior emails from the user to similar audiences (same recipient, same topic class). Mirror tone, opening style, sign-off, and length distribution. If there is no precedent, default to: warm, direct, no filler, short paragraphs, one clear ask.
## Required structure
Every draft includes, in this order:
1. **Subject line** — concrete, ≤ 8 words, no clickbait, no `Re:` unless replying.
2. **Opening (1 sentence)** — context the recipient already shares; never restate what they wrote unless the thread is long.
3. **Body** — the actual point in one short paragraph. Bullets only if there are >3 discrete items.
4. **Single explicit CTA** — what you want the recipient to do, with a soft deadline if relevant.
5. **Sign-off** — match the user's prior closing style.
## Always offer alternatives
End your message with: "Want me to make it shorter, more formal, or add a different angle?" — give the user one obvious next step.

View file

@ -1,23 +0,0 @@
---
name: kb-research
description: Structured approach to finding and synthesizing information from the user's knowledge base
allowed-tools: scrape_webpage, read_file, ls_tree, grep, web_search
---
# Knowledge-base research
## When to use this skill
- The user asks "find/look up/research" something specifically inside their knowledge base.
- The user references documents, notes, repos, or connector data they expect to exist already.
- A multi-document synthesis is required (e.g., "summarize what we've discussed about X across all my notes").
## Plan
1. Decompose the user's question into 2-4 specific, citation-worthy sub-questions.
2. For each sub-question, run **one** targeted KB search (focused on terms the user would have written, not synonyms). Open the most relevant 2-3 documents fully via `read_file` if their excerpts are too short.
3. Use `grep` to find supporting passages in long files instead of re-reading them end to end.
4. Cite every claim with `[citation:chunk_id]` exactly as the chunk tag specifies.
## What good output looks like
- Short paragraphs with inline citations.
- Quoted phrases when wording matters.
- An explicit "Not found in your knowledge base" callout when a sub-question has no support — never fabricate.

View file

@ -1,22 +0,0 @@
---
name: meeting-prep
description: Pull together briefing materials before a scheduled meeting
allowed-tools: web_search, scrape_webpage, read_file
---
# Meeting preparation
## When to use this skill
The user mentions an upcoming meeting, call, or interview and asks you to "prep", "brief me", "pull background", or "what do I need to know about X before tomorrow".
## Output structure
Always produce these sections (omit any with no signal — don't pad):
1. **Attendees & context** — who's in the room, their roles, what they care about. Pull from KB notes about prior interactions; supplement with public profile facts via `web_search` when names or companies are unfamiliar.
2. **Open threads** — outstanding action items, unresolved decisions, last-mentioned blockers from prior conversation history.
3. **Recent moves** — within the last 30 days: relevant launches, hires, news. Cite KB chunks when present, otherwise external sources.
4. **Suggested questions** — 3-5 questions the user could ask, tailored to the open threads and the attendees' likely priorities.
## Source ordering
- Always check the user's KB **first** for prior meeting notes, internal docs, or Slack threads about these attendees.
- Only fall back to `web_search` for *publicly verifiable* facts — never to fabricate a participant's preferences or relationships.

View file

@ -1,23 +0,0 @@
---
name: report-writing
description: How to scope, draft, and revise a Markdown report artifact via generate_report
allowed-tools: generate_report, read_file
---
# Report writing
## When to use this skill
The user explicitly requests a deliverable: "write a report on …", "draft a memo", "produce a brief", "expand the previous report". A creation or modification verb pointed at an artifact is required (see `generate_report`'s when-to-call rules).
## Decision flow
1. **Source strategy.** Decide which `source_strategy` fits:
- `conversation` — substantive Q&A on the topic already in chat.
- `kb_search` — fresh topic; supply 15 precise `search_queries`.
- `auto` — partial conversation context; let the tool fall back.
- `provided` — verbatim source text only.
2. **Style.** Default to `report_style="detailed"` unless the user explicitly asks for "brief", "one page", "500 words".
3. **Revisions.** When modifying an existing report from this conversation, set `parent_report_id` and put the change list in `user_instructions` ("add carbon-capture section", "tighten conclusion").
4. **Never paste the report back into chat** after `generate_report` returns — confirm and let the artifact card render itself.
## Hooks for KB-only mode
If `kb_search`/`auto` returns no results, do **not** silently switch to general knowledge. Surface the gap in your confirmation message.

View file

@ -1,25 +0,0 @@
---
name: slack-summary
description: Distill a Slack channel or thread into actionable summary
---
# Slack summarization
## When to use this skill
The user asks to summarize Slack ("what happened in #eng-platform this week", "what did Alice say about the launch", "catch me up on the design channel").
## Required inputs
Confirm before searching:
- **Which channel(s) or thread(s)?** Don't guess if ambiguous.
- **What time window?** Default to the last 7 days when not specified, but say so.
## Output shape
Produce three concise sections:
1. **Key decisions** — explicit choices that were made, with the deciding message cited.
2. **Open questions** — things asked but not answered, with the asking message cited.
3. **Action items**`@mention` who owes what by when, *only if explicitly stated*. Don't invent assignees.
## What not to do
- Never produce a chronological play-by-play of every message — distill.
- Never quote private messages without flagging them as such.
- If the channel was empty in the time window, say so — don't fabricate filler.

View file

@ -1,53 +0,0 @@
"""
The ``invalid`` fallback tool.
When the model emits a tool call whose name doesn't match any registered
tool, :class:`ToolCallNameRepairMiddleware` rewrites the call to ``invalid``
with the original name and a parser/validation error string. This tool's
execution then returns that error to the model so it can self-correct.
Ported from OpenCode's ``packages/opencode/src/tool/invalid.ts`` —
LangChain has no equivalent fallback path; the default behavior on an
unknown tool name is a hard ``ToolNotFoundError`` which kills the turn.
Critically, the :class:`ToolDefinition` for this tool is **excluded** from
the system-prompt tool list and from ``LLMToolSelectorMiddleware`` selection
(see ``ToolDefinition.always_include`` filtering in the registry) the
model never advertises ``invalid`` as a callable. It only ever shows up
in the tool registry so LangGraph can dispatch the rewritten call.
"""
from __future__ import annotations
from langchain_core.tools import tool
INVALID_TOOL_NAME = "invalid"
INVALID_TOOL_DESCRIPTION = "Do not use"
def _format_invalid_message(tool: str | None, error: str | None) -> str:
"""Return the user-visible error string. Mirrors ``invalid.ts``."""
name = tool or "<unknown>"
detail = error or "(no error message provided)"
return (
f"The arguments provided to the tool `{name}` are invalid: {detail}\n"
f"Read the tool's docstring carefully and try again with valid arguments."
)
@tool(name_or_callable=INVALID_TOOL_NAME, description=INVALID_TOOL_DESCRIPTION)
def invalid_tool(tool: str | None = None, error: str | None = None) -> str:
"""Return a human-readable explanation of a tool-call validation failure.
Activated only when :class:`ToolCallNameRepairMiddleware` rewrites a
failed tool call to ``invalid`` with the original tool name and the
error message produced during validation.
"""
return _format_invalid_message(tool, error)
__all__ = [
"INVALID_TOOL_DESCRIPTION",
"INVALID_TOOL_NAME",
"invalid_tool",
]