diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 1f998d01a..8c8587cea 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -197,13 +197,6 @@ LLAMA_CLOUD_API_KEY=llx-nnn # AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/ # AZURE_DI_KEY=your-key -# Daytona Sandbox (isolated code execution) -# DAYTONA_SANDBOX_ENABLED=FALSE -# DAYTONA_API_KEY=your-daytona-api-key -# DAYTONA_API_URL=https://app.daytona.io/api -# DAYTONA_TARGET=us -# DAYTONA_SNAPSHOT_ID= - # OPTIONAL: Add these for LangSmith Observability LANGSMITH_TRACING=true LANGSMITH_ENDPOINT=https://api.smith.langchain.com diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index 9bf38cad6..6ff98badf 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -439,7 +439,6 @@ async def create_surfsense_deep_agent( SurfSenseFilesystemMiddleware( search_space_id=search_space_id, created_by_id=user_id, - thread_id=thread_id, ), create_summarization_middleware(llm, StateBackend), PatchToolCallsMiddleware(), @@ -467,7 +466,6 @@ async def create_surfsense_deep_agent( SurfSenseFilesystemMiddleware( search_space_id=search_space_id, created_by_id=user_id, - thread_id=thread_id, ), SubAgentMiddleware(backend=StateBackend, subagents=[general_purpose_spec]), create_summarization_middleware(llm, StateBackend), diff --git a/surfsense_backend/app/agents/new_chat/middleware/filesystem.py b/surfsense_backend/app/agents/new_chat/middleware/filesystem.py index af5a6925b..d7697ef15 100644 --- a/surfsense_backend/app/agents/new_chat/middleware/filesystem.py +++ b/surfsense_backend/app/agents/new_chat/middleware/filesystem.py @@ -7,12 +7,10 @@ This middleware customizes prompts and persists write/edit operations for from __future__ import annotations import asyncio -import logging import re from datetime import UTC, datetime from typing import Annotated, Any -from daytona.common.errors import DaytonaError from deepagents import FilesystemMiddleware from deepagents.backends.protocol import EditResult, WriteResult from deepagents.backends.utils import validate_path @@ -25,11 +23,6 @@ from langchain_core.tools import BaseTool, StructuredTool from langgraph.types import Command from sqlalchemy import delete, select -from app.agents.new_chat.sandbox import ( - _evict_sandbox_cache, - get_or_create_sandbox, - is_sandbox_enabled, -) from app.db import Chunk, Document, DocumentType, Folder, shielded_async_session from app.indexing_pipeline.document_chunker import chunk_text from app.utils.document_converters import ( @@ -38,8 +31,6 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) -logger = logging.getLogger(__name__) - # ============================================================================= # System Prompt (injected into every model call by wrap_model_call) # ============================================================================= @@ -49,7 +40,7 @@ SURFSENSE_FILESYSTEM_SYSTEM_PROMPT = """## Following Conventions - Read files before editing — understand existing content before making changes. - Mimic existing style, naming conventions, and patterns. -## Filesystem Tools +## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file`, `glob`, `grep`, `save_document` All file paths must start with a `/`. - ls: list files and directories at a given path. @@ -137,24 +128,6 @@ SURFSENSE_GREP_TOOL_DESCRIPTION = """Search for a literal text pattern across fi Use this to locate relevant document files/chunks before reading full files. """ -SURFSENSE_EXECUTE_CODE_TOOL_DESCRIPTION = """Executes Python code in an isolated sandbox environment. - -Common data-science packages are pre-installed (pandas, numpy, matplotlib, -scipy, scikit-learn). - -When to use this tool: use execute_code for numerical computation, data -analysis, statistics, and any task that benefits from running Python code. -Never perform arithmetic manually when this tool is available. - -Usage notes: -- No outbound network access. -- Returns combined stdout/stderr with exit code. -- Use print() to produce output. -- You can create files, run shell commands via subprocess or os.system(), - and use any standard library module. -- Use the optional timeout parameter to override the default timeout. -""" - SURFSENSE_SAVE_DOCUMENT_TOOL_DESCRIPTION = """Permanently saves a document to the user's knowledge base. This is an expensive operation — it creates a new Document record in the @@ -175,36 +148,17 @@ Args: class SurfSenseFilesystemMiddleware(FilesystemMiddleware): """SurfSense-specific filesystem middleware with DB persistence for docs.""" - _MAX_EXECUTE_TIMEOUT = 300 - def __init__( self, *, search_space_id: int | None = None, created_by_id: str | None = None, - thread_id: int | str | None = None, tool_token_limit_before_evict: int | None = 20000, ) -> None: self._search_space_id = search_space_id self._created_by_id = created_by_id - self._thread_id = thread_id - self._sandbox_available = is_sandbox_enabled() and thread_id is not None - - system_prompt = SURFSENSE_FILESYSTEM_SYSTEM_PROMPT - if self._sandbox_available: - system_prompt += ( - "\n- execute_code: run Python code in an isolated sandbox." - "\n\n## Code Execution" - "\n\nUse execute_code whenever a task benefits from running code." - " Never perform arithmetic manually." - "\n\nDocuments here are XML-wrapped markdown, not raw data files." - " To work with them programmatically, read the document first," - " extract the data, write it as a clean file (CSV, JSON, etc.)," - " and then run your code against it." - ) - super().__init__( - system_prompt=system_prompt, + system_prompt=SURFSENSE_FILESYSTEM_SYSTEM_PROMPT, custom_tool_descriptions={ "ls": SURFSENSE_LIST_FILES_TOOL_DESCRIPTION, "read_file": SURFSENSE_READ_FILE_TOOL_DESCRIPTION, @@ -214,12 +168,10 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware): "grep": SURFSENSE_GREP_TOOL_DESCRIPTION, }, tool_token_limit_before_evict=tool_token_limit_before_evict, - max_execute_timeout=self._MAX_EXECUTE_TIMEOUT, ) + # Remove the execute tool (no sandbox backend) self.tools = [t for t in self.tools if t.name != "execute"] self.tools.append(self._create_save_document_tool()) - if self._sandbox_available: - self.tools.append(self._create_execute_code_tool()) @staticmethod def _run_async_blocking(coro: Any) -> Any: @@ -503,108 +455,6 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware): coroutine=async_save_document, ) - def _create_execute_code_tool(self) -> BaseTool: - """Create execute_code tool backed by a Daytona sandbox.""" - - def sync_execute_code( - command: Annotated[ - str, "Python code to execute. Use print() to see output." - ], - runtime: ToolRuntime[None, FilesystemState], - timeout: Annotated[ - int | None, - "Optional timeout in seconds.", - ] = None, - ) -> str: - if timeout is not None: - if timeout < 0: - return f"Error: timeout must be non-negative, got {timeout}." - if timeout > self._MAX_EXECUTE_TIMEOUT: - return f"Error: timeout {timeout}s exceeds maximum ({self._MAX_EXECUTE_TIMEOUT}s)." - return self._run_async_blocking( - self._execute_in_sandbox(command, runtime, timeout) - ) - - async def async_execute_code( - command: Annotated[ - str, "Python code to execute. Use print() to see output." - ], - runtime: ToolRuntime[None, FilesystemState], - timeout: Annotated[ - int | None, - "Optional timeout in seconds.", - ] = None, - ) -> str: - if timeout is not None: - if timeout < 0: - return f"Error: timeout must be non-negative, got {timeout}." - if timeout > self._MAX_EXECUTE_TIMEOUT: - return f"Error: timeout {timeout}s exceeds maximum ({self._MAX_EXECUTE_TIMEOUT}s)." - return await self._execute_in_sandbox(command, runtime, timeout) - - return StructuredTool.from_function( - name="execute_code", - description=SURFSENSE_EXECUTE_CODE_TOOL_DESCRIPTION, - func=sync_execute_code, - coroutine=async_execute_code, - ) - - @staticmethod - def _wrap_as_python(code: str) -> str: - """Wrap Python code in a shell invocation for the sandbox.""" - return f"python3 << 'PYEOF'\n{code}\nPYEOF" - - async def _execute_in_sandbox( - self, - command: str, - runtime: ToolRuntime[None, FilesystemState], - timeout: int | None, - ) -> str: - """Core logic: get sandbox, sync files, run command, handle retries.""" - assert self._thread_id is not None - command = self._wrap_as_python(command) - - try: - return await self._try_sandbox_execute(command, runtime, timeout) - except (DaytonaError, Exception) as first_err: - logger.warning( - "Sandbox execute failed for thread %s, retrying: %s", - self._thread_id, - first_err, - ) - _evict_sandbox_cache(self._thread_id) - try: - return await self._try_sandbox_execute(command, runtime, timeout) - except Exception: - logger.exception( - "Sandbox retry also failed for thread %s", self._thread_id - ) - return "Error: Code execution is temporarily unavailable. Please try again." - - async def _try_sandbox_execute( - self, - command: str, - runtime: ToolRuntime[None, FilesystemState], - timeout: int | None, - ) -> str: - sandbox, is_new = await get_or_create_sandbox(self._thread_id) - # files = runtime.state.get("files") or {} - # await sync_files_to_sandbox(self._thread_id, files, sandbox, is_new) - result = await sandbox.aexecute(command, timeout=timeout) - output = (result.output or "").strip() - if not output and result.exit_code == 0: - return ( - "[Code executed successfully but produced no output. " - "Use print() to display results, then try again.]" - ) - parts = [result.output] - if result.exit_code is not None: - status = "succeeded" if result.exit_code == 0 else "failed" - parts.append(f"\n[Command {status} with exit code {result.exit_code}]") - if result.truncated: - parts.append("\n[Output was truncated due to size limits]") - return "".join(parts) - def _create_write_file_tool(self) -> BaseTool: """Create write_file — ephemeral for /documents/*, persisted otherwise.""" tool_description = ( diff --git a/surfsense_backend/app/agents/new_chat/middleware/knowledge_search.py b/surfsense_backend/app/agents/new_chat/middleware/knowledge_search.py index 06ed4ad80..7b0dd2f71 100644 --- a/surfsense_backend/app/agents/new_chat/middleware/knowledge_search.py +++ b/surfsense_backend/app/agents/new_chat/middleware/knowledge_search.py @@ -774,16 +774,11 @@ class KnowledgeBaseSearchMiddleware(AgentMiddleware): # type: ignore[type-arg] messages = state.get("messages") or [] if not messages: return None - - last_human = None - for msg in reversed(messages): - if isinstance(msg, HumanMessage): - last_human = msg - break - if last_human is None: + last_message = messages[-1] + if not isinstance(last_message, HumanMessage): return None - user_text = _extract_text_from_message(last_human).strip() + user_text = _extract_text_from_message(last_message).strip() if not user_text: return None diff --git a/surfsense_backend/app/agents/new_chat/sandbox.py b/surfsense_backend/app/agents/new_chat/sandbox.py index 614a1b1b9..8b634993b 100644 --- a/surfsense_backend/app/agents/new_chat/sandbox.py +++ b/surfsense_backend/app/agents/new_chat/sandbox.py @@ -42,7 +42,7 @@ class _TimeoutAwareSandbox(DaytonaSandbox): """ def execute(self, command: str, *, timeout: int | None = None) -> ExecuteResponse: - t = timeout if timeout is not None else self._default_timeout + t = timeout if timeout is not None else self._timeout result = self._sandbox.process.exec(command, timeout=t) return ExecuteResponse( output=result.result, @@ -58,10 +58,8 @@ class _TimeoutAwareSandbox(DaytonaSandbox): _daytona_client: Daytona | None = None _sandbox_cache: dict[str, _TimeoutAwareSandbox] = {} -_seeded_files: dict[str, dict[str, str]] = {} _SANDBOX_CACHE_MAX_SIZE = 20 THREAD_LABEL_KEY = "surfsense_thread" -SANDBOX_DOCUMENTS_ROOT = "/home/daytona/documents" def is_sandbox_enabled() -> bool: @@ -80,29 +78,14 @@ def _get_client() -> Daytona: return _daytona_client -def _sandbox_create_params( - labels: dict[str, str], -) -> CreateSandboxFromSnapshotParams: - snapshot_id = os.environ.get("DAYTONA_SNAPSHOT_ID") or None - return CreateSandboxFromSnapshotParams( - language="python", - labels=labels, - snapshot=snapshot_id, - network_block_all=True, - auto_stop_interval=10, - auto_delete_interval=60, - ) - - -def _find_or_create(thread_id: str) -> tuple[_TimeoutAwareSandbox, bool]: +def _find_or_create(thread_id: str) -> _TimeoutAwareSandbox: """Find an existing sandbox for *thread_id*, or create a new one. - Returns a tuple of (sandbox, is_new) where *is_new* is True when a - fresh sandbox was created (first time or replacement after failure). + If an existing sandbox is found but is stopped/archived, it will be + restarted automatically before returning. """ client = _get_client() labels = {THREAD_LABEL_KEY: thread_id} - is_new = False try: sandbox = client.find_one(labels=labels) @@ -126,43 +109,41 @@ def _find_or_create(thread_id: str) -> tuple[_TimeoutAwareSandbox, bool]: sandbox.id, sandbox.state, ) - try: - client.delete(sandbox) - except Exception: - logger.debug("Could not delete broken sandbox %s", sandbox.id, exc_info=True) - sandbox = client.create(_sandbox_create_params(labels)) - is_new = True + sandbox = client.create( + CreateSandboxFromSnapshotParams(language="python", labels=labels) + ) logger.info("Created replacement sandbox: %s", sandbox.id) elif sandbox.state != SandboxState.STARTED: sandbox.wait_for_sandbox_start(timeout=60) except Exception: logger.info("No existing sandbox for thread %s — creating one", thread_id) - sandbox = client.create(_sandbox_create_params(labels)) - is_new = True + sandbox = client.create( + CreateSandboxFromSnapshotParams(language="python", labels=labels) + ) logger.info("Created new sandbox: %s", sandbox.id) - return _TimeoutAwareSandbox(sandbox=sandbox), is_new + return _TimeoutAwareSandbox(sandbox=sandbox) -async def get_or_create_sandbox( - thread_id: int | str, -) -> tuple[_TimeoutAwareSandbox, bool]: +async def get_or_create_sandbox(thread_id: int | str) -> _TimeoutAwareSandbox: """Get or create a sandbox for a conversation thread. Uses an in-process cache keyed by thread_id so subsequent messages in the same conversation reuse the sandbox object without an API call. + Args: + thread_id: The conversation thread identifier. + Returns: - Tuple of (sandbox, is_new). *is_new* is True when a fresh sandbox - was created, signalling that file tracking should be reset. + DaytonaSandbox connected to the sandbox. """ key = str(thread_id) cached = _sandbox_cache.get(key) if cached is not None: logger.info("Reusing cached sandbox for thread %s", key) - return cached, False - sandbox, is_new = await asyncio.to_thread(_find_or_create, key) + return cached + sandbox = await asyncio.to_thread(_find_or_create, key) _sandbox_cache[key] = sandbox if len(_sandbox_cache) > _SANDBOX_CACHE_MAX_SIZE: @@ -170,60 +151,12 @@ async def get_or_create_sandbox( _sandbox_cache.pop(oldest_key, None) logger.debug("Evicted oldest sandbox cache entry: %s", oldest_key) - return sandbox, is_new - - -async def sync_files_to_sandbox( - thread_id: int | str, - files: dict[str, dict], - sandbox: _TimeoutAwareSandbox, - is_new: bool, -) -> None: - """Upload new or changed virtual-filesystem files to the sandbox. - - Compares *files* (from ``state["files"]``) against the ``_seeded_files`` - tracking dict and uploads only what has changed. When *is_new* is True - the tracking is reset so every file is re-uploaded. - """ - key = str(thread_id) - if is_new: - _seeded_files.pop(key, None) - - tracked = _seeded_files.get(key, {}) - to_upload: list[tuple[str, bytes]] = [] - - for vpath, fdata in files.items(): - modified_at = fdata.get("modified_at", "") - if tracked.get(vpath) == modified_at: - continue - content = "\n".join(fdata.get("content", [])) - sandbox_path = f"{SANDBOX_DOCUMENTS_ROOT}{vpath}" - to_upload.append((sandbox_path, content.encode("utf-8"))) - - if not to_upload: - return - - def _upload() -> None: - sandbox.upload_files(to_upload) - - await asyncio.to_thread(_upload) - - new_tracked = dict(tracked) - for vpath, fdata in files.items(): - new_tracked[vpath] = fdata.get("modified_at", "") - _seeded_files[key] = new_tracked - logger.info("Synced %d file(s) to sandbox for thread %s", len(to_upload), key) - - -def _evict_sandbox_cache(thread_id: int | str) -> None: - key = str(thread_id) - _sandbox_cache.pop(key, None) - _seeded_files.pop(key, None) + return sandbox async def delete_sandbox(thread_id: int | str) -> None: """Delete the sandbox for a conversation thread.""" - _evict_sandbox_cache(thread_id) + _sandbox_cache.pop(str(thread_id), None) def _delete() -> None: client = _get_client() @@ -260,11 +193,7 @@ def _get_sandbox_files_dir() -> Path: def _local_path_for(thread_id: int | str, sandbox_path: str) -> Path: """Map a sandbox-internal absolute path to a local filesystem path.""" relative = sandbox_path.lstrip("/") - base = (_get_sandbox_files_dir() / str(thread_id)).resolve() - target = (base / relative).resolve() - if not target.is_relative_to(base): - raise ValueError(f"Path traversal blocked: {sandbox_path}") - return target + return _get_sandbox_files_dir() / str(thread_id) / relative def get_local_sandbox_file(thread_id: int | str, sandbox_path: str) -> bytes | None: @@ -297,7 +226,7 @@ async def persist_and_delete_sandbox( Per-file errors are logged but do **not** prevent the sandbox from being deleted — freeing Daytona storage is the priority. """ - _evict_sandbox_cache(thread_id) + _sandbox_cache.pop(str(thread_id), None) def _persist_and_delete() -> None: client = _get_client() diff --git a/surfsense_backend/scripts/create_sandbox_snapshot.py b/surfsense_backend/scripts/create_sandbox_snapshot.py deleted file mode 100644 index 97ed6dfe8..000000000 --- a/surfsense_backend/scripts/create_sandbox_snapshot.py +++ /dev/null @@ -1,96 +0,0 @@ -"""Create the Daytona snapshot used by SurfSense code-execution sandboxes. - -Run from the backend directory: - cd surfsense_backend - uv run python scripts/create_sandbox_snapshot.py - -Prerequisites: - - DAYTONA_API_KEY set in surfsense_backend/.env (or exported in shell) - - DAYTONA_API_URL=https://app.daytona.io/api - - DAYTONA_TARGET=us (or eu) - -After this script succeeds, add to surfsense_backend/.env: - DAYTONA_SNAPSHOT_ID=surfsense-sandbox -""" - -import os -import sys -import time -from pathlib import Path - -from dotenv import load_dotenv - -_here = Path(__file__).parent -for candidate in [_here / "../surfsense_backend/.env", _here / ".env", _here / "../.env"]: - if candidate.exists(): - load_dotenv(candidate) - break - -from daytona import CreateSnapshotParams, Daytona, Image # noqa: E402 - -SNAPSHOT_NAME = "surfsense-sandbox" - -PACKAGES = [ - "pandas", - "numpy", - "matplotlib", - "scipy", - "scikit-learn", -] - - -def build_image() -> Image: - """Build the sandbox image with data-science packages and a /documents symlink.""" - return ( - Image.debian_slim("3.12") - .pip_install(*PACKAGES) - # Symlink /documents → /home/daytona/documents so the LLM can use - # the same /documents/ path it sees in the virtual filesystem. - .run_commands( - "mkdir -p /home/daytona/documents", - "ln -sfn /home/daytona/documents /documents", - ) - ) - - -def main() -> None: - api_key = os.environ.get("DAYTONA_API_KEY") - if not api_key: - print("ERROR: DAYTONA_API_KEY is not set.", file=sys.stderr) - print("Add it to surfsense_backend/.env or export it in your shell.", file=sys.stderr) - sys.exit(1) - - daytona = Daytona() - - try: - existing = daytona.snapshot.get(SNAPSHOT_NAME) - print(f"Deleting existing snapshot '{SNAPSHOT_NAME}' …") - daytona.snapshot.delete(existing) - print(f"Deleted '{SNAPSHOT_NAME}'. Waiting for removal to propagate …") - for attempt in range(30): - time.sleep(2) - try: - daytona.snapshot.get(SNAPSHOT_NAME) - except Exception: - print(f"Confirmed '{SNAPSHOT_NAME}' is gone.\n") - break - else: - print(f"WARNING: '{SNAPSHOT_NAME}' may still exist after 60s. Proceeding anyway.\n") - except Exception: - pass - - print(f"Building snapshot '{SNAPSHOT_NAME}' …") - print(f"Packages: {', '.join(PACKAGES)}\n") - - daytona.snapshot.create( - CreateSnapshotParams(name=SNAPSHOT_NAME, image=build_image()), - on_logs=lambda chunk: print(chunk, end="", flush=True), - ) - - print(f"\n\nSnapshot '{SNAPSHOT_NAME}' is ready.") - print("\nAdd this to surfsense_backend/.env:") - print(f" DAYTONA_SNAPSHOT_ID={SNAPSHOT_NAME}") - - -if __name__ == "__main__": - main()