mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-06 22:32:39 +02:00
Add builtin memory route slice for delegated agents.
This commit is contained in:
parent
b9bc06e7b4
commit
ff307dd923
7 changed files with 521 additions and 0 deletions
|
|
@ -0,0 +1,54 @@
|
|||
"""`memory` route: ``SubAgent`` spec for deepagents."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Sequence
|
||||
from typing import Any
|
||||
|
||||
from deepagents import SubAgent
|
||||
from langchain_core.language_models import BaseChatModel
|
||||
|
||||
from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import (
|
||||
read_md_file,
|
||||
)
|
||||
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
|
||||
ToolsPermissions,
|
||||
merge_tools_permissions,
|
||||
)
|
||||
from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import (
|
||||
pack_subagent,
|
||||
)
|
||||
|
||||
from .tools.index import load_tools
|
||||
|
||||
NAME = "memory"
|
||||
|
||||
|
||||
def build_subagent(
|
||||
*,
|
||||
dependencies: dict[str, Any],
|
||||
model: BaseChatModel | None = None,
|
||||
extra_middleware: Sequence[Any] | None = None,
|
||||
extra_tools_bucket: ToolsPermissions | None = None,
|
||||
) -> SubAgent:
|
||||
buckets = load_tools(dependencies=dependencies)
|
||||
merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket)
|
||||
tools = [
|
||||
row["tool"]
|
||||
for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"])
|
||||
if row.get("tool") is not None
|
||||
]
|
||||
interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")}
|
||||
description = read_md_file(__package__, "description").strip()
|
||||
if not description:
|
||||
description = "Handles memory tasks for this workspace."
|
||||
system_prompt = read_md_file(__package__, "system_prompt").strip()
|
||||
return pack_subagent(
|
||||
name=NAME,
|
||||
description=description,
|
||||
system_prompt=system_prompt,
|
||||
tools=tools,
|
||||
interrupt_on=interrupt_on,
|
||||
model=model,
|
||||
extra_middleware=extra_middleware,
|
||||
)
|
||||
|
|
@ -0,0 +1 @@
|
|||
Use for storing durable user memory (private team variant selected at runtime).
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
You are the SurfSense memory operations sub-agent.
|
||||
You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis.
|
||||
|
||||
<goal>
|
||||
Persist durable preferences/facts/instructions with `update_memory` while avoiding transient or unsafe storage.
|
||||
</goal>
|
||||
|
||||
<visibility_scope>
|
||||
{{MEMORY_VISIBILITY_POLICY}}
|
||||
</visibility_scope>
|
||||
|
||||
<available_tools>
|
||||
- `update_memory`
|
||||
</available_tools>
|
||||
|
||||
<tool_policy>
|
||||
- Save only durable information with future value.
|
||||
- Do not store transient chatter.
|
||||
- Do not store secrets unless explicitly instructed.
|
||||
- If memory intent is unclear, return `status=blocked` with the missing intent signal.
|
||||
</tool_policy>
|
||||
|
||||
<out_of_scope>
|
||||
- Do not execute non-memory tool actions.
|
||||
- Do not store irrelevant, transient, or speculative information.
|
||||
</out_of_scope>
|
||||
|
||||
<safety>
|
||||
- Prefer minimal-memory writes over over-collection.
|
||||
- Never claim memory was updated unless `update_memory` succeeded.
|
||||
</safety>
|
||||
|
||||
<failure_policy>
|
||||
- On tool failure, return `status=error` with concise recovery steps.
|
||||
- When intent is ambiguous, return `status=blocked` with required disambiguation fields.
|
||||
</failure_policy>
|
||||
|
||||
<output_contract>
|
||||
Return **only** one JSON object (no markdown/prose):
|
||||
{
|
||||
"status": "success" | "partial" | "blocked" | "error",
|
||||
"action_summary": string,
|
||||
"evidence": {
|
||||
"memory_updated": boolean,
|
||||
"memory_category": "preference" | "fact" | "instruction" | null,
|
||||
"stored_summary": string | null
|
||||
},
|
||||
"next_step": string | null,
|
||||
"missing_fields": string[] | null,
|
||||
"assumptions": string[] | null
|
||||
}
|
||||
Rules:
|
||||
- `status=success` -> `next_step=null`, `missing_fields=null`.
|
||||
- `status=partial|blocked|error` -> `next_step` must be non-null.
|
||||
- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null.
|
||||
</output_contract>
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
"""Memory tools: persist user or team markdown memory for later turns."""
|
||||
|
||||
from .update_memory import create_update_memory_tool, create_update_team_memory_tool
|
||||
|
||||
__all__ = [
|
||||
"create_update_memory_tool",
|
||||
"create_update_team_memory_tool",
|
||||
]
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import (
|
||||
ToolsPermissions,
|
||||
)
|
||||
from app.db import ChatVisibility
|
||||
|
||||
from .update_memory import create_update_memory_tool, create_update_team_memory_tool
|
||||
|
||||
|
||||
def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions:
|
||||
resolved_dependencies = {**(dependencies or {}), **kwargs}
|
||||
if resolved_dependencies.get("thread_visibility") == ChatVisibility.SEARCH_SPACE:
|
||||
mem = create_update_team_memory_tool(
|
||||
search_space_id=resolved_dependencies["search_space_id"],
|
||||
db_session=resolved_dependencies["db_session"],
|
||||
llm=resolved_dependencies.get("llm"),
|
||||
)
|
||||
return {"allow": [{"name": getattr(mem, "name", "") or "", "tool": mem}], "ask": []}
|
||||
mem = create_update_memory_tool(
|
||||
user_id=resolved_dependencies["user_id"],
|
||||
db_session=resolved_dependencies["db_session"],
|
||||
llm=resolved_dependencies.get("llm"),
|
||||
)
|
||||
return {"allow": [{"name": getattr(mem, "name", "") or "", "tool": mem}], "ask": []}
|
||||
|
|
@ -0,0 +1,375 @@
|
|||
"""Overwrite one markdown memory document per user or team, with size and shrink guards."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, Literal
|
||||
from uuid import UUID
|
||||
|
||||
from langchain_core.messages import HumanMessage
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.db import SearchSpace, User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MEMORY_SOFT_LIMIT = 18_000
|
||||
MEMORY_HARD_LIMIT = 25_000
|
||||
|
||||
_SECTION_HEADING_RE = re.compile(r"^##\s+(.+)$", re.MULTILINE)
|
||||
_HEADING_NORMALIZE_RE = re.compile(r"\s+")
|
||||
|
||||
_MARKER_RE = re.compile(r"\[(fact|pref|instr)\]")
|
||||
_BULLET_FORMAT_RE = re.compile(r"^- \(\d{4}-\d{2}-\d{2}\) \[(fact|pref|instr)\] .+$")
|
||||
_PERSONAL_ONLY_MARKERS = {"pref", "instr"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Diff validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _extract_headings(memory: str) -> set[str]:
|
||||
"""Return all ``## …`` heading texts (without the ``## `` prefix)."""
|
||||
return set(_SECTION_HEADING_RE.findall(memory))
|
||||
|
||||
|
||||
def _normalize_heading(heading: str) -> str:
|
||||
"""Normalize heading text for robust scope checks."""
|
||||
return _HEADING_NORMALIZE_RE.sub(" ", heading.strip().lower())
|
||||
|
||||
|
||||
def _validate_memory_scope(
|
||||
content: str, scope: Literal["user", "team"]
|
||||
) -> dict[str, Any] | None:
|
||||
"""Reject personal-only markers ([pref], [instr]) in team memory."""
|
||||
if scope != "team":
|
||||
return None
|
||||
|
||||
markers = set(_MARKER_RE.findall(content))
|
||||
leaked = sorted(markers & _PERSONAL_ONLY_MARKERS)
|
||||
if leaked:
|
||||
tags = ", ".join(f"[{m}]" for m in leaked)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": (
|
||||
f"Team memory cannot include personal markers: {tags}. "
|
||||
"Use [fact] only in team memory."
|
||||
),
|
||||
}
|
||||
return None
|
||||
|
||||
|
||||
def _validate_bullet_format(content: str) -> list[str]:
|
||||
"""Return warnings for bullet lines that don't match the required format.
|
||||
|
||||
Expected: ``- (YYYY-MM-DD) [fact|pref|instr] text``
|
||||
"""
|
||||
warnings: list[str] = []
|
||||
for line in content.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped.startswith("- "):
|
||||
continue
|
||||
if not _BULLET_FORMAT_RE.match(stripped):
|
||||
short = stripped[:80] + ("..." if len(stripped) > 80 else "")
|
||||
warnings.append(f"Malformed bullet: {short}")
|
||||
return warnings
|
||||
|
||||
|
||||
def _validate_diff(old_memory: str | None, new_memory: str) -> list[str]:
|
||||
"""Return a list of warning strings about suspicious changes."""
|
||||
if not old_memory:
|
||||
return []
|
||||
|
||||
warnings: list[str] = []
|
||||
old_headings = _extract_headings(old_memory)
|
||||
new_headings = _extract_headings(new_memory)
|
||||
dropped = old_headings - new_headings
|
||||
if dropped:
|
||||
names = ", ".join(sorted(dropped))
|
||||
warnings.append(
|
||||
f"Sections removed: {names}. "
|
||||
"If unintentional, the user can restore from the settings page."
|
||||
)
|
||||
|
||||
old_len = len(old_memory)
|
||||
new_len = len(new_memory)
|
||||
if old_len > 0 and new_len < old_len * 0.4:
|
||||
warnings.append(
|
||||
f"Memory shrank significantly ({old_len:,} -> {new_len:,} chars). "
|
||||
"Possible data loss."
|
||||
)
|
||||
return warnings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Size validation & soft warning
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _validate_memory_size(content: str) -> dict[str, Any] | None:
|
||||
"""Return an error/warning dict if *content* is too large, else None."""
|
||||
length = len(content)
|
||||
if length > MEMORY_HARD_LIMIT:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": (
|
||||
f"Memory exceeds {MEMORY_HARD_LIMIT:,} character limit "
|
||||
f"({length:,} chars). Consolidate by merging related items, "
|
||||
"removing outdated entries, and shortening descriptions. "
|
||||
"Then call update_memory again."
|
||||
),
|
||||
}
|
||||
return None
|
||||
|
||||
|
||||
def _soft_warning(content: str) -> str | None:
|
||||
"""Return a warning string if content exceeds the soft limit."""
|
||||
length = len(content)
|
||||
if length > MEMORY_SOFT_LIMIT:
|
||||
return (
|
||||
f"Memory is at {length:,}/{MEMORY_HARD_LIMIT:,} characters. "
|
||||
"Consolidate by merging related items and removing less important "
|
||||
"entries on your next update."
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Forced rewrite when memory exceeds the hard limit
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_FORCED_REWRITE_PROMPT = """\
|
||||
You are a memory curator. The following memory document exceeds the character \
|
||||
limit and must be shortened.
|
||||
|
||||
RULES:
|
||||
1. Rewrite the document to be under {target} characters.
|
||||
2. Preserve existing ## headings. Every entry must remain under a heading. You may merge
|
||||
or rename headings to consolidate, but keep names personal and descriptive.
|
||||
3. Priority for keeping content: [instr] > [pref] > [fact].
|
||||
4. Merge duplicate entries, remove outdated entries, shorten verbose descriptions.
|
||||
5. Every bullet MUST have format: - (YYYY-MM-DD) [fact|pref|instr] text
|
||||
6. Preserve the user's first name in entries — do not replace it with "the user".
|
||||
7. Output ONLY the consolidated markdown — no explanations, no wrapping.
|
||||
|
||||
<memory_document>
|
||||
{content}
|
||||
</memory_document>"""
|
||||
|
||||
|
||||
async def _forced_rewrite(content: str, llm: Any) -> str | None:
|
||||
"""Use a focused LLM call to compress *content* under the hard limit.
|
||||
|
||||
Returns the rewritten string, or ``None`` if the call fails.
|
||||
"""
|
||||
try:
|
||||
prompt = _FORCED_REWRITE_PROMPT.format(
|
||||
target=MEMORY_HARD_LIMIT, content=content
|
||||
)
|
||||
response = await llm.ainvoke(
|
||||
[HumanMessage(content=prompt)],
|
||||
config={"tags": ["surfsense:internal"]},
|
||||
)
|
||||
text = (
|
||||
response.content
|
||||
if isinstance(response.content, str)
|
||||
else str(response.content)
|
||||
)
|
||||
return text.strip()
|
||||
except Exception:
|
||||
logger.exception("Forced rewrite LLM call failed")
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared save-and-respond logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _save_memory(
|
||||
*,
|
||||
updated_memory: str,
|
||||
old_memory: str | None,
|
||||
llm: Any | None,
|
||||
apply_fn,
|
||||
commit_fn,
|
||||
rollback_fn,
|
||||
label: str,
|
||||
scope: Literal["user", "team"],
|
||||
) -> dict[str, Any]:
|
||||
"""Validate, optionally force-rewrite if over the hard limit, save, and
|
||||
return a response dict.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
updated_memory : str
|
||||
The new document the agent submitted.
|
||||
old_memory : str | None
|
||||
The previously persisted document (for diff checks).
|
||||
llm : Any | None
|
||||
LLM instance for forced rewrite (may be ``None``).
|
||||
apply_fn : callable(str) -> None
|
||||
Callback that sets the new memory on the ORM object.
|
||||
commit_fn : coroutine
|
||||
``session.commit``.
|
||||
rollback_fn : coroutine
|
||||
``session.rollback``.
|
||||
label : str
|
||||
Human label for log messages (e.g. "user memory", "team memory").
|
||||
"""
|
||||
content = updated_memory
|
||||
|
||||
# --- forced rewrite if over the hard limit ---
|
||||
if len(content) > MEMORY_HARD_LIMIT and llm is not None:
|
||||
rewritten = await _forced_rewrite(content, llm)
|
||||
if rewritten is not None and len(rewritten) < len(content):
|
||||
content = rewritten
|
||||
|
||||
# --- hard-limit gate (reject if still too large after rewrite) ---
|
||||
size_err = _validate_memory_size(content)
|
||||
if size_err:
|
||||
return size_err
|
||||
|
||||
scope_err = _validate_memory_scope(content, scope)
|
||||
if scope_err:
|
||||
return scope_err
|
||||
|
||||
# --- persist ---
|
||||
try:
|
||||
apply_fn(content)
|
||||
await commit_fn()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to update %s: %s", label, e)
|
||||
await rollback_fn()
|
||||
return {"status": "error", "message": f"Failed to update {label}: {e}"}
|
||||
|
||||
# --- build response ---
|
||||
resp: dict[str, Any] = {
|
||||
"status": "saved",
|
||||
"message": f"{label.capitalize()} updated.",
|
||||
}
|
||||
|
||||
if content is not updated_memory:
|
||||
resp["notice"] = "Memory was automatically rewritten to fit within limits."
|
||||
|
||||
diff_warnings = _validate_diff(old_memory, content)
|
||||
if diff_warnings:
|
||||
resp["diff_warnings"] = diff_warnings
|
||||
|
||||
format_warnings = _validate_bullet_format(content)
|
||||
if format_warnings:
|
||||
resp["format_warnings"] = format_warnings
|
||||
|
||||
warning = _soft_warning(content)
|
||||
if warning:
|
||||
resp["warning"] = warning
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tool factories
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def create_update_memory_tool(
|
||||
user_id: str | UUID,
|
||||
db_session: AsyncSession,
|
||||
llm: Any | None = None,
|
||||
):
|
||||
uid = UUID(user_id) if isinstance(user_id, str) else user_id
|
||||
|
||||
@tool
|
||||
async def update_memory(updated_memory: str) -> dict[str, Any]:
|
||||
"""Update the user's personal memory document.
|
||||
|
||||
Your current memory is shown in <user_memory> in the system prompt.
|
||||
When the user shares important long-term information (preferences,
|
||||
facts, instructions, context), rewrite the memory document to include
|
||||
the new information. Merge new facts with existing ones, update
|
||||
contradictions, remove outdated entries, and keep it concise.
|
||||
|
||||
Args:
|
||||
updated_memory: The FULL updated markdown document (not a diff).
|
||||
"""
|
||||
try:
|
||||
result = await db_session.execute(select(User).where(User.id == uid))
|
||||
user = result.scalars().first()
|
||||
if not user:
|
||||
return {"status": "error", "message": "User not found."}
|
||||
|
||||
old_memory = user.memory_md
|
||||
|
||||
return await _save_memory(
|
||||
updated_memory=updated_memory,
|
||||
old_memory=old_memory,
|
||||
llm=llm,
|
||||
apply_fn=lambda content: setattr(user, "memory_md", content),
|
||||
commit_fn=db_session.commit,
|
||||
rollback_fn=db_session.rollback,
|
||||
label="memory",
|
||||
scope="user",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to update user memory: %s", e)
|
||||
await db_session.rollback()
|
||||
return {
|
||||
"status": "error",
|
||||
"message": f"Failed to update memory: {e}",
|
||||
}
|
||||
|
||||
return update_memory
|
||||
|
||||
|
||||
def create_update_team_memory_tool(
|
||||
search_space_id: int,
|
||||
db_session: AsyncSession,
|
||||
llm: Any | None = None,
|
||||
):
|
||||
@tool
|
||||
async def update_memory(updated_memory: str) -> dict[str, Any]:
|
||||
"""Update the team's shared memory document for this search space.
|
||||
|
||||
Your current team memory is shown in <team_memory> in the system
|
||||
prompt. When the team shares important long-term information
|
||||
(decisions, conventions, key facts, priorities), rewrite the memory
|
||||
document to include the new information. Merge new facts with
|
||||
existing ones, update contradictions, remove outdated entries, and
|
||||
keep it concise.
|
||||
|
||||
Args:
|
||||
updated_memory: The FULL updated markdown document (not a diff).
|
||||
"""
|
||||
try:
|
||||
result = await db_session.execute(
|
||||
select(SearchSpace).where(SearchSpace.id == search_space_id)
|
||||
)
|
||||
space = result.scalars().first()
|
||||
if not space:
|
||||
return {"status": "error", "message": "Search space not found."}
|
||||
|
||||
old_memory = space.shared_memory_md
|
||||
|
||||
return await _save_memory(
|
||||
updated_memory=updated_memory,
|
||||
old_memory=old_memory,
|
||||
llm=llm,
|
||||
apply_fn=lambda content: setattr(space, "shared_memory_md", content),
|
||||
commit_fn=db_session.commit,
|
||||
rollback_fn=db_session.rollback,
|
||||
label="team memory",
|
||||
scope="team",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to update team memory: %s", e)
|
||||
await db_session.rollback()
|
||||
return {
|
||||
"status": "error",
|
||||
"message": f"Failed to update team memory: {e}",
|
||||
}
|
||||
|
||||
return update_memory
|
||||
Loading…
Add table
Add a link
Reference in a new issue