diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 8c8587cea..1f998d01a 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -197,6 +197,13 @@ LLAMA_CLOUD_API_KEY=llx-nnn # AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/ # AZURE_DI_KEY=your-key +# Daytona Sandbox (isolated code execution) +# DAYTONA_SANDBOX_ENABLED=FALSE +# DAYTONA_API_KEY=your-daytona-api-key +# DAYTONA_API_URL=https://app.daytona.io/api +# DAYTONA_TARGET=us +# DAYTONA_SNAPSHOT_ID= + # OPTIONAL: Add these for LangSmith Observability LANGSMITH_TRACING=true LANGSMITH_ENDPOINT=https://api.smith.langchain.com diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index 6ff98badf..9bf38cad6 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -439,6 +439,7 @@ async def create_surfsense_deep_agent( SurfSenseFilesystemMiddleware( search_space_id=search_space_id, created_by_id=user_id, + thread_id=thread_id, ), create_summarization_middleware(llm, StateBackend), PatchToolCallsMiddleware(), @@ -466,6 +467,7 @@ async def create_surfsense_deep_agent( SurfSenseFilesystemMiddleware( search_space_id=search_space_id, created_by_id=user_id, + thread_id=thread_id, ), SubAgentMiddleware(backend=StateBackend, subagents=[general_purpose_spec]), create_summarization_middleware(llm, StateBackend), diff --git a/surfsense_backend/app/agents/new_chat/middleware/filesystem.py b/surfsense_backend/app/agents/new_chat/middleware/filesystem.py index d7697ef15..af5a6925b 100644 --- a/surfsense_backend/app/agents/new_chat/middleware/filesystem.py +++ b/surfsense_backend/app/agents/new_chat/middleware/filesystem.py @@ -7,10 +7,12 @@ This middleware customizes prompts and persists write/edit operations for from __future__ import annotations import asyncio +import logging import re from datetime import UTC, datetime from typing import Annotated, Any +from daytona.common.errors import DaytonaError from deepagents import FilesystemMiddleware from deepagents.backends.protocol import EditResult, WriteResult from deepagents.backends.utils import validate_path @@ -23,6 +25,11 @@ from langchain_core.tools import BaseTool, StructuredTool from langgraph.types import Command from sqlalchemy import delete, select +from app.agents.new_chat.sandbox import ( + _evict_sandbox_cache, + get_or_create_sandbox, + is_sandbox_enabled, +) from app.db import Chunk, Document, DocumentType, Folder, shielded_async_session from app.indexing_pipeline.document_chunker import chunk_text from app.utils.document_converters import ( @@ -31,6 +38,8 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +logger = logging.getLogger(__name__) + # ============================================================================= # System Prompt (injected into every model call by wrap_model_call) # ============================================================================= @@ -40,7 +49,7 @@ SURFSENSE_FILESYSTEM_SYSTEM_PROMPT = """## Following Conventions - Read files before editing — understand existing content before making changes. - Mimic existing style, naming conventions, and patterns. -## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file`, `glob`, `grep`, `save_document` +## Filesystem Tools All file paths must start with a `/`. - ls: list files and directories at a given path. @@ -128,6 +137,24 @@ SURFSENSE_GREP_TOOL_DESCRIPTION = """Search for a literal text pattern across fi Use this to locate relevant document files/chunks before reading full files. """ +SURFSENSE_EXECUTE_CODE_TOOL_DESCRIPTION = """Executes Python code in an isolated sandbox environment. + +Common data-science packages are pre-installed (pandas, numpy, matplotlib, +scipy, scikit-learn). + +When to use this tool: use execute_code for numerical computation, data +analysis, statistics, and any task that benefits from running Python code. +Never perform arithmetic manually when this tool is available. + +Usage notes: +- No outbound network access. +- Returns combined stdout/stderr with exit code. +- Use print() to produce output. +- You can create files, run shell commands via subprocess or os.system(), + and use any standard library module. +- Use the optional timeout parameter to override the default timeout. +""" + SURFSENSE_SAVE_DOCUMENT_TOOL_DESCRIPTION = """Permanently saves a document to the user's knowledge base. This is an expensive operation — it creates a new Document record in the @@ -148,17 +175,36 @@ Args: class SurfSenseFilesystemMiddleware(FilesystemMiddleware): """SurfSense-specific filesystem middleware with DB persistence for docs.""" + _MAX_EXECUTE_TIMEOUT = 300 + def __init__( self, *, search_space_id: int | None = None, created_by_id: str | None = None, + thread_id: int | str | None = None, tool_token_limit_before_evict: int | None = 20000, ) -> None: self._search_space_id = search_space_id self._created_by_id = created_by_id + self._thread_id = thread_id + self._sandbox_available = is_sandbox_enabled() and thread_id is not None + + system_prompt = SURFSENSE_FILESYSTEM_SYSTEM_PROMPT + if self._sandbox_available: + system_prompt += ( + "\n- execute_code: run Python code in an isolated sandbox." + "\n\n## Code Execution" + "\n\nUse execute_code whenever a task benefits from running code." + " Never perform arithmetic manually." + "\n\nDocuments here are XML-wrapped markdown, not raw data files." + " To work with them programmatically, read the document first," + " extract the data, write it as a clean file (CSV, JSON, etc.)," + " and then run your code against it." + ) + super().__init__( - system_prompt=SURFSENSE_FILESYSTEM_SYSTEM_PROMPT, + system_prompt=system_prompt, custom_tool_descriptions={ "ls": SURFSENSE_LIST_FILES_TOOL_DESCRIPTION, "read_file": SURFSENSE_READ_FILE_TOOL_DESCRIPTION, @@ -168,10 +214,12 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware): "grep": SURFSENSE_GREP_TOOL_DESCRIPTION, }, tool_token_limit_before_evict=tool_token_limit_before_evict, + max_execute_timeout=self._MAX_EXECUTE_TIMEOUT, ) - # Remove the execute tool (no sandbox backend) self.tools = [t for t in self.tools if t.name != "execute"] self.tools.append(self._create_save_document_tool()) + if self._sandbox_available: + self.tools.append(self._create_execute_code_tool()) @staticmethod def _run_async_blocking(coro: Any) -> Any: @@ -455,6 +503,108 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware): coroutine=async_save_document, ) + def _create_execute_code_tool(self) -> BaseTool: + """Create execute_code tool backed by a Daytona sandbox.""" + + def sync_execute_code( + command: Annotated[ + str, "Python code to execute. Use print() to see output." + ], + runtime: ToolRuntime[None, FilesystemState], + timeout: Annotated[ + int | None, + "Optional timeout in seconds.", + ] = None, + ) -> str: + if timeout is not None: + if timeout < 0: + return f"Error: timeout must be non-negative, got {timeout}." + if timeout > self._MAX_EXECUTE_TIMEOUT: + return f"Error: timeout {timeout}s exceeds maximum ({self._MAX_EXECUTE_TIMEOUT}s)." + return self._run_async_blocking( + self._execute_in_sandbox(command, runtime, timeout) + ) + + async def async_execute_code( + command: Annotated[ + str, "Python code to execute. Use print() to see output." + ], + runtime: ToolRuntime[None, FilesystemState], + timeout: Annotated[ + int | None, + "Optional timeout in seconds.", + ] = None, + ) -> str: + if timeout is not None: + if timeout < 0: + return f"Error: timeout must be non-negative, got {timeout}." + if timeout > self._MAX_EXECUTE_TIMEOUT: + return f"Error: timeout {timeout}s exceeds maximum ({self._MAX_EXECUTE_TIMEOUT}s)." + return await self._execute_in_sandbox(command, runtime, timeout) + + return StructuredTool.from_function( + name="execute_code", + description=SURFSENSE_EXECUTE_CODE_TOOL_DESCRIPTION, + func=sync_execute_code, + coroutine=async_execute_code, + ) + + @staticmethod + def _wrap_as_python(code: str) -> str: + """Wrap Python code in a shell invocation for the sandbox.""" + return f"python3 << 'PYEOF'\n{code}\nPYEOF" + + async def _execute_in_sandbox( + self, + command: str, + runtime: ToolRuntime[None, FilesystemState], + timeout: int | None, + ) -> str: + """Core logic: get sandbox, sync files, run command, handle retries.""" + assert self._thread_id is not None + command = self._wrap_as_python(command) + + try: + return await self._try_sandbox_execute(command, runtime, timeout) + except (DaytonaError, Exception) as first_err: + logger.warning( + "Sandbox execute failed for thread %s, retrying: %s", + self._thread_id, + first_err, + ) + _evict_sandbox_cache(self._thread_id) + try: + return await self._try_sandbox_execute(command, runtime, timeout) + except Exception: + logger.exception( + "Sandbox retry also failed for thread %s", self._thread_id + ) + return "Error: Code execution is temporarily unavailable. Please try again." + + async def _try_sandbox_execute( + self, + command: str, + runtime: ToolRuntime[None, FilesystemState], + timeout: int | None, + ) -> str: + sandbox, is_new = await get_or_create_sandbox(self._thread_id) + # files = runtime.state.get("files") or {} + # await sync_files_to_sandbox(self._thread_id, files, sandbox, is_new) + result = await sandbox.aexecute(command, timeout=timeout) + output = (result.output or "").strip() + if not output and result.exit_code == 0: + return ( + "[Code executed successfully but produced no output. " + "Use print() to display results, then try again.]" + ) + parts = [result.output] + if result.exit_code is not None: + status = "succeeded" if result.exit_code == 0 else "failed" + parts.append(f"\n[Command {status} with exit code {result.exit_code}]") + if result.truncated: + parts.append("\n[Output was truncated due to size limits]") + return "".join(parts) + def _create_write_file_tool(self) -> BaseTool: """Create write_file — ephemeral for /documents/*, persisted otherwise.""" tool_description = ( diff --git a/surfsense_backend/app/agents/new_chat/middleware/knowledge_search.py b/surfsense_backend/app/agents/new_chat/middleware/knowledge_search.py index 7b0dd2f71..06ed4ad80 100644 --- a/surfsense_backend/app/agents/new_chat/middleware/knowledge_search.py +++ b/surfsense_backend/app/agents/new_chat/middleware/knowledge_search.py @@ -774,11 +774,16 @@ class KnowledgeBaseSearchMiddleware(AgentMiddleware): # type: ignore[type-arg] messages = state.get("messages") or [] if not messages: return None - last_message = messages[-1] - if not isinstance(last_message, HumanMessage): + + last_human = None + for msg in reversed(messages): + if isinstance(msg, HumanMessage): + last_human = msg + break + if last_human is None: return None - user_text = _extract_text_from_message(last_message).strip() + user_text = _extract_text_from_message(last_human).strip() if not user_text: return None diff --git a/surfsense_backend/app/agents/new_chat/sandbox.py b/surfsense_backend/app/agents/new_chat/sandbox.py index 8b634993b..614a1b1b9 100644 --- a/surfsense_backend/app/agents/new_chat/sandbox.py +++ b/surfsense_backend/app/agents/new_chat/sandbox.py @@ -42,7 +42,7 @@ class _TimeoutAwareSandbox(DaytonaSandbox): """ def execute(self, command: str, *, timeout: int | None = None) -> ExecuteResponse: - t = timeout if timeout is not None else self._timeout + t = timeout if timeout is not None else self._default_timeout result = self._sandbox.process.exec(command, timeout=t) return ExecuteResponse( output=result.result, @@ -58,8 +58,10 @@ class _TimeoutAwareSandbox(DaytonaSandbox): _daytona_client: Daytona | None = None _sandbox_cache: dict[str, _TimeoutAwareSandbox] = {} +_seeded_files: dict[str, dict[str, str]] = {} _SANDBOX_CACHE_MAX_SIZE = 20 THREAD_LABEL_KEY = "surfsense_thread" +SANDBOX_DOCUMENTS_ROOT = "/home/daytona/documents" def is_sandbox_enabled() -> bool: @@ -78,14 +80,29 @@ def _get_client() -> Daytona: return _daytona_client -def _find_or_create(thread_id: str) -> _TimeoutAwareSandbox: +def _sandbox_create_params( + labels: dict[str, str], +) -> CreateSandboxFromSnapshotParams: + snapshot_id = os.environ.get("DAYTONA_SNAPSHOT_ID") or None + return CreateSandboxFromSnapshotParams( + language="python", + labels=labels, + snapshot=snapshot_id, + network_block_all=True, + auto_stop_interval=10, + auto_delete_interval=60, + ) + + +def _find_or_create(thread_id: str) -> tuple[_TimeoutAwareSandbox, bool]: """Find an existing sandbox for *thread_id*, or create a new one. - If an existing sandbox is found but is stopped/archived, it will be - restarted automatically before returning. + Returns a tuple of (sandbox, is_new) where *is_new* is True when a + fresh sandbox was created (first time or replacement after failure). """ client = _get_client() labels = {THREAD_LABEL_KEY: thread_id} + is_new = False try: sandbox = client.find_one(labels=labels) @@ -109,41 +126,43 @@ def _find_or_create(thread_id: str) -> _TimeoutAwareSandbox: sandbox.id, sandbox.state, ) - sandbox = client.create( - CreateSandboxFromSnapshotParams(language="python", labels=labels) - ) + try: + client.delete(sandbox) + except Exception: + logger.debug("Could not delete broken sandbox %s", sandbox.id, exc_info=True) + sandbox = client.create(_sandbox_create_params(labels)) + is_new = True logger.info("Created replacement sandbox: %s", sandbox.id) elif sandbox.state != SandboxState.STARTED: sandbox.wait_for_sandbox_start(timeout=60) except Exception: logger.info("No existing sandbox for thread %s — creating one", thread_id) - sandbox = client.create( - CreateSandboxFromSnapshotParams(language="python", labels=labels) - ) + sandbox = client.create(_sandbox_create_params(labels)) + is_new = True logger.info("Created new sandbox: %s", sandbox.id) - return _TimeoutAwareSandbox(sandbox=sandbox) + return _TimeoutAwareSandbox(sandbox=sandbox), is_new -async def get_or_create_sandbox(thread_id: int | str) -> _TimeoutAwareSandbox: +async def get_or_create_sandbox( + thread_id: int | str, +) -> tuple[_TimeoutAwareSandbox, bool]: """Get or create a sandbox for a conversation thread. Uses an in-process cache keyed by thread_id so subsequent messages in the same conversation reuse the sandbox object without an API call. - Args: - thread_id: The conversation thread identifier. - Returns: - DaytonaSandbox connected to the sandbox. + Tuple of (sandbox, is_new). *is_new* is True when a fresh sandbox + was created, signalling that file tracking should be reset. """ key = str(thread_id) cached = _sandbox_cache.get(key) if cached is not None: logger.info("Reusing cached sandbox for thread %s", key) - return cached - sandbox = await asyncio.to_thread(_find_or_create, key) + return cached, False + sandbox, is_new = await asyncio.to_thread(_find_or_create, key) _sandbox_cache[key] = sandbox if len(_sandbox_cache) > _SANDBOX_CACHE_MAX_SIZE: @@ -151,12 +170,60 @@ async def get_or_create_sandbox(thread_id: int | str) -> _TimeoutAwareSandbox: _sandbox_cache.pop(oldest_key, None) logger.debug("Evicted oldest sandbox cache entry: %s", oldest_key) - return sandbox + return sandbox, is_new + + +async def sync_files_to_sandbox( + thread_id: int | str, + files: dict[str, dict], + sandbox: _TimeoutAwareSandbox, + is_new: bool, +) -> None: + """Upload new or changed virtual-filesystem files to the sandbox. + + Compares *files* (from ``state["files"]``) against the ``_seeded_files`` + tracking dict and uploads only what has changed. When *is_new* is True + the tracking is reset so every file is re-uploaded. + """ + key = str(thread_id) + if is_new: + _seeded_files.pop(key, None) + + tracked = _seeded_files.get(key, {}) + to_upload: list[tuple[str, bytes]] = [] + + for vpath, fdata in files.items(): + modified_at = fdata.get("modified_at", "") + if tracked.get(vpath) == modified_at: + continue + content = "\n".join(fdata.get("content", [])) + sandbox_path = f"{SANDBOX_DOCUMENTS_ROOT}{vpath}" + to_upload.append((sandbox_path, content.encode("utf-8"))) + + if not to_upload: + return + + def _upload() -> None: + sandbox.upload_files(to_upload) + + await asyncio.to_thread(_upload) + + new_tracked = dict(tracked) + for vpath, fdata in files.items(): + new_tracked[vpath] = fdata.get("modified_at", "") + _seeded_files[key] = new_tracked + logger.info("Synced %d file(s) to sandbox for thread %s", len(to_upload), key) + + +def _evict_sandbox_cache(thread_id: int | str) -> None: + key = str(thread_id) + _sandbox_cache.pop(key, None) + _seeded_files.pop(key, None) async def delete_sandbox(thread_id: int | str) -> None: """Delete the sandbox for a conversation thread.""" - _sandbox_cache.pop(str(thread_id), None) + _evict_sandbox_cache(thread_id) def _delete() -> None: client = _get_client() @@ -193,7 +260,11 @@ def _get_sandbox_files_dir() -> Path: def _local_path_for(thread_id: int | str, sandbox_path: str) -> Path: """Map a sandbox-internal absolute path to a local filesystem path.""" relative = sandbox_path.lstrip("/") - return _get_sandbox_files_dir() / str(thread_id) / relative + base = (_get_sandbox_files_dir() / str(thread_id)).resolve() + target = (base / relative).resolve() + if not target.is_relative_to(base): + raise ValueError(f"Path traversal blocked: {sandbox_path}") + return target def get_local_sandbox_file(thread_id: int | str, sandbox_path: str) -> bytes | None: @@ -226,7 +297,7 @@ async def persist_and_delete_sandbox( Per-file errors are logged but do **not** prevent the sandbox from being deleted — freeing Daytona storage is the priority. """ - _sandbox_cache.pop(str(thread_id), None) + _evict_sandbox_cache(thread_id) def _persist_and_delete() -> None: client = _get_client() diff --git a/surfsense_backend/scripts/create_sandbox_snapshot.py b/surfsense_backend/scripts/create_sandbox_snapshot.py new file mode 100644 index 000000000..97ed6dfe8 --- /dev/null +++ b/surfsense_backend/scripts/create_sandbox_snapshot.py @@ -0,0 +1,96 @@ +"""Create the Daytona snapshot used by SurfSense code-execution sandboxes. + +Run from the backend directory: + cd surfsense_backend + uv run python scripts/create_sandbox_snapshot.py + +Prerequisites: + - DAYTONA_API_KEY set in surfsense_backend/.env (or exported in shell) + - DAYTONA_API_URL=https://app.daytona.io/api + - DAYTONA_TARGET=us (or eu) + +After this script succeeds, add to surfsense_backend/.env: + DAYTONA_SNAPSHOT_ID=surfsense-sandbox +""" + +import os +import sys +import time +from pathlib import Path + +from dotenv import load_dotenv + +_here = Path(__file__).parent +for candidate in [_here / "../surfsense_backend/.env", _here / ".env", _here / "../.env"]: + if candidate.exists(): + load_dotenv(candidate) + break + +from daytona import CreateSnapshotParams, Daytona, Image # noqa: E402 + +SNAPSHOT_NAME = "surfsense-sandbox" + +PACKAGES = [ + "pandas", + "numpy", + "matplotlib", + "scipy", + "scikit-learn", +] + + +def build_image() -> Image: + """Build the sandbox image with data-science packages and a /documents symlink.""" + return ( + Image.debian_slim("3.12") + .pip_install(*PACKAGES) + # Symlink /documents → /home/daytona/documents so the LLM can use + # the same /documents/ path it sees in the virtual filesystem. + .run_commands( + "mkdir -p /home/daytona/documents", + "ln -sfn /home/daytona/documents /documents", + ) + ) + + +def main() -> None: + api_key = os.environ.get("DAYTONA_API_KEY") + if not api_key: + print("ERROR: DAYTONA_API_KEY is not set.", file=sys.stderr) + print("Add it to surfsense_backend/.env or export it in your shell.", file=sys.stderr) + sys.exit(1) + + daytona = Daytona() + + try: + existing = daytona.snapshot.get(SNAPSHOT_NAME) + print(f"Deleting existing snapshot '{SNAPSHOT_NAME}' …") + daytona.snapshot.delete(existing) + print(f"Deleted '{SNAPSHOT_NAME}'. Waiting for removal to propagate …") + for attempt in range(30): + time.sleep(2) + try: + daytona.snapshot.get(SNAPSHOT_NAME) + except Exception: + print(f"Confirmed '{SNAPSHOT_NAME}' is gone.\n") + break + else: + print(f"WARNING: '{SNAPSHOT_NAME}' may still exist after 60s. Proceeding anyway.\n") + except Exception: + pass + + print(f"Building snapshot '{SNAPSHOT_NAME}' …") + print(f"Packages: {', '.join(PACKAGES)}\n") + + daytona.snapshot.create( + CreateSnapshotParams(name=SNAPSHOT_NAME, image=build_image()), + on_logs=lambda chunk: print(chunk, end="", flush=True), + ) + + print(f"\n\nSnapshot '{SNAPSHOT_NAME}' is ready.") + print("\nAdd this to surfsense_backend/.env:") + print(f" DAYTONA_SNAPSHOT_ID={SNAPSHOT_NAME}") + + +if __name__ == "__main__": + main()