From 54dd6f904ceace076b5ae1eb33c602983984f2c1 Mon Sep 17 00:00:00 2001 From: "DESKTOP-RTLN3BA\\$punk" Date: Wed, 25 Feb 2026 12:25:59 -0800 Subject: [PATCH] feat: implement local file persistence for Daytona sandboxes - Added functionality to persist sandbox files locally before deletion. - Introduced methods for retrieving and deleting locally stored sandbox files. - Updated routes to handle local file downloads and background deletion of sandboxes. - Enhanced streaming tasks to support sandbox file management. --- surfsense_backend/.env.example | 4 +- surfsense_backend/.gitignore | 1 + .../app/agents/new_chat/sandbox.py | 111 ++++++++++++++++++ .../app/routes/new_chat_routes.py | 36 ++++++ .../app/routes/sandbox_routes.py | 16 ++- .../app/tasks/chat/stream_new_chat.py | 45 ++++++- 6 files changed, 208 insertions(+), 5 deletions(-) diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index ae17c35f3..3e7102d81 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -173,4 +173,6 @@ LANGSMITH_PROJECT=surfsense DAYTONA_SANDBOX_ENABLED=TRUE DAYTONA_API_KEY=dtn_asdasfasfafas DAYTONA_API_URL=https://app.daytona.io/api -DAYTONA_TARGET=us \ No newline at end of file +DAYTONA_TARGET=us +# Directory for locally-persisted sandbox files (after sandbox deletion) +SANDBOX_FILES_DIR=sandbox_files \ No newline at end of file diff --git a/surfsense_backend/.gitignore b/surfsense_backend/.gitignore index 13a523310..0b1374b29 100644 --- a/surfsense_backend/.gitignore +++ b/surfsense_backend/.gitignore @@ -6,6 +6,7 @@ __pycache__/ .flashrank_cache surf_new_backend.egg-info/ podcasts/ +sandbox_files/ temp_audio/ celerybeat-schedule* celerybeat-schedule.* diff --git a/surfsense_backend/app/agents/new_chat/sandbox.py b/surfsense_backend/app/agents/new_chat/sandbox.py index 996414557..24b380b0b 100644 --- a/surfsense_backend/app/agents/new_chat/sandbox.py +++ b/surfsense_backend/app/agents/new_chat/sandbox.py @@ -4,6 +4,9 @@ Daytona sandbox provider for SurfSense deep agent. Manages the lifecycle of sandboxed code execution environments. Each conversation thread gets its own isolated sandbox instance via the Daytona cloud API, identified by labels. + +Files created during a session are persisted to local storage before +the sandbox is deleted so they remain downloadable after cleanup. """ from __future__ import annotations @@ -11,6 +14,8 @@ from __future__ import annotations import asyncio import logging import os +import shutil +from pathlib import Path from daytona import ( CreateSandboxFromSnapshotParams, @@ -18,6 +23,7 @@ from daytona import ( DaytonaConfig, SandboxState, ) +from daytona.common.errors import DaytonaError from deepagents.backends.protocol import ExecuteResponse from langchain_daytona import DaytonaSandbox @@ -140,6 +146,10 @@ async def delete_sandbox(thread_id: int | str) -> None: labels = {THREAD_LABEL_KEY: str(thread_id)} try: sandbox = client.find_one(labels=labels) + except DaytonaError: + logger.debug("No sandbox to delete for thread %s (already removed)", thread_id) + return + try: client.delete(sandbox) logger.info("Sandbox deleted: %s", sandbox.id) except Exception: @@ -150,3 +160,104 @@ async def delete_sandbox(thread_id: int | str) -> None: ) await asyncio.to_thread(_delete) + + +# --------------------------------------------------------------------------- +# Local file persistence +# --------------------------------------------------------------------------- + +def _get_sandbox_files_dir() -> Path: + return Path(os.environ.get("SANDBOX_FILES_DIR", "sandbox_files")) + + +def _local_path_for(thread_id: int | str, sandbox_path: str) -> Path: + """Map a sandbox-internal absolute path to a local filesystem path.""" + relative = sandbox_path.lstrip("/") + return _get_sandbox_files_dir() / str(thread_id) / relative + + +def get_local_sandbox_file(thread_id: int | str, sandbox_path: str) -> bytes | None: + """Read a previously-persisted sandbox file from local storage. + + Returns the file bytes, or *None* if the file does not exist locally. + """ + local = _local_path_for(thread_id, sandbox_path) + if local.is_file(): + return local.read_bytes() + return None + + +def delete_local_sandbox_files(thread_id: int | str) -> None: + """Remove all locally-persisted sandbox files for a thread.""" + thread_dir = _get_sandbox_files_dir() / str(thread_id) + if thread_dir.is_dir(): + shutil.rmtree(thread_dir, ignore_errors=True) + logger.info("Deleted local sandbox files for thread %s", thread_id) + + +async def persist_and_delete_sandbox( + thread_id: int | str, + sandbox_file_paths: list[str], +) -> None: + """Download sandbox files to local storage, then delete the sandbox. + + Each file in *sandbox_file_paths* is downloaded from the Daytona + sandbox and saved under ``{SANDBOX_FILES_DIR}/{thread_id}/…``. + Per-file errors are logged but do **not** prevent the sandbox from + being deleted — freeing Daytona storage is the priority. + """ + + def _persist_and_delete() -> None: + client = _get_client() + labels = {THREAD_LABEL_KEY: str(thread_id)} + + try: + sandbox = client.find_one(labels=labels) + except Exception: + logger.info( + "No sandbox found for thread %s — nothing to persist", thread_id + ) + return + + # Ensure the sandbox is running so we can download files + if sandbox.state != SandboxState.STARTED: + try: + sandbox.start(timeout=60) + except Exception: + logger.warning( + "Could not start sandbox %s for file download — deleting anyway", + sandbox.id, + exc_info=True, + ) + try: + client.delete(sandbox) + except Exception: + pass + return + + for path in sandbox_file_paths: + try: + content: bytes = sandbox.fs.download_file(path) + local = _local_path_for(thread_id, path) + local.parent.mkdir(parents=True, exist_ok=True) + local.write_bytes(content) + logger.info("Persisted sandbox file %s → %s", path, local) + except Exception: + logger.warning( + "Failed to persist sandbox file %s for thread %s", + path, + thread_id, + exc_info=True, + ) + + try: + client.delete(sandbox) + logger.info("Sandbox deleted after file persistence: %s", sandbox.id) + except Exception: + logger.warning( + "Failed to delete sandbox %s after persistence", + sandbox.id, + exc_info=True, + ) + + await asyncio.to_thread(_persist_and_delete) diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index 1713f4ea8..7856a2c17 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -52,9 +52,42 @@ from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat from app.users import current_active_user from app.utils.rbac import check_permission +import asyncio +import logging + +_logger = logging.getLogger(__name__) + router = APIRouter() +def _try_delete_sandbox(thread_id: int) -> None: + """Fire-and-forget sandbox + local file deletion so the HTTP response isn't blocked.""" + from app.agents.new_chat.sandbox import ( + delete_local_sandbox_files, + delete_sandbox, + is_sandbox_enabled, + ) + + if not is_sandbox_enabled(): + return + + async def _bg() -> None: + try: + await delete_sandbox(thread_id) + except Exception: + _logger.warning("Background sandbox delete failed for thread %s", thread_id, exc_info=True) + try: + delete_local_sandbox_files(thread_id) + except Exception: + _logger.warning("Local sandbox file cleanup failed for thread %s", thread_id, exc_info=True) + + try: + loop = asyncio.get_running_loop() + loop.create_task(_bg()) + except RuntimeError: + pass + + async def check_thread_access( session: AsyncSession, thread: NewChatThread, @@ -648,6 +681,9 @@ async def delete_thread( await session.delete(db_thread) await session.commit() + + _try_delete_sandbox(thread_id) + return {"message": "Thread deleted successfully"} except HTTPException: diff --git a/surfsense_backend/app/routes/sandbox_routes.py b/surfsense_backend/app/routes/sandbox_routes.py index 428eea09e..e5b737371 100644 --- a/surfsense_backend/app/routes/sandbox_routes.py +++ b/surfsense_backend/app/routes/sandbox_routes.py @@ -71,9 +71,23 @@ async def download_sandbox_file( "You don't have permission to access files in this thread", ) + from app.agents.new_chat.sandbox import get_local_sandbox_file + + # Prefer locally-persisted copy (sandbox may already be deleted) + local_content = get_local_sandbox_file(thread_id, path) + if local_content is not None: + filename = path.rsplit("/", 1)[-1] if "/" in path else path + media_type = _guess_media_type(filename) + return Response( + content=local_content, + media_type=media_type, + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, + ) + + # Fall back to live sandbox download try: sandbox = await get_or_create_sandbox(thread_id) - raw_sandbox = sandbox._sandbox + raw_sandbox = sandbox._sandbox # noqa: SLF001 content: bytes = await asyncio.to_thread(raw_sandbox.fs.download_file, path) except Exception as exc: logger.warning("Sandbox file download failed for %s: %s", path, exc) diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index ae04a6bee..bf942f548 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -9,11 +9,12 @@ Supports loading LLM configurations from: - NewLLMConfig database table (positive IDs for user-created configs with prompt settings) """ +import asyncio import json import logging import re from collections.abc import AsyncGenerator -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any from uuid import UUID @@ -193,6 +194,7 @@ class StreamResult: accumulated_text: str = "" is_interrupted: bool = False interrupt_value: dict[str, Any] | None = None + sandbox_files: list[str] = field(default_factory=list) async def _stream_agent_events( @@ -874,6 +876,12 @@ async def _stream_agent_events( om = re.search(r"\nOutput:\n([\s\S]*)", raw_text) output_text = om.group(1) if om else "" thread_id_str = config.get("configurable", {}).get("thread_id", "") + + for sf_match in re.finditer(r"^SANDBOX_FILE:\s*(.+)$", output_text, re.MULTILINE): + fpath = sf_match.group(1).strip() + if fpath and fpath not in result.sandbox_files: + result.sandbox_files.append(fpath) + yield streaming_service.format_tool_output_available( tool_call_id, { @@ -950,6 +958,33 @@ async def _stream_agent_events( yield streaming_service.format_interrupt_request(result.interrupt_value) +def _try_persist_and_delete_sandbox( + thread_id: int, + sandbox_files: list[str], +) -> None: + """Fire-and-forget: persist sandbox files locally then delete the sandbox.""" + from app.agents.new_chat.sandbox import is_sandbox_enabled, persist_and_delete_sandbox + + if not is_sandbox_enabled(): + return + + async def _run() -> None: + try: + await persist_and_delete_sandbox(thread_id, sandbox_files) + except Exception: + logging.getLogger(__name__).warning( + "persist_and_delete_sandbox failed for thread %s", + thread_id, + exc_info=True, + ) + + try: + loop = asyncio.get_running_loop() + loop.create_task(_run()) + except RuntimeError: + pass + + async def stream_new_chat( user_query: str, search_space_id: int, @@ -986,6 +1021,7 @@ async def stream_new_chat( str: SSE formatted response strings """ streaming_service = VercelStreamingService() + stream_result = StreamResult() try: # Mark AI as responding to this user for live collaboration @@ -1268,7 +1304,6 @@ async def stream_new_chat( items=initial_items, ) - stream_result = StreamResult() async for sse in _stream_agent_events( agent=agent, config=config, @@ -1382,6 +1417,8 @@ async def stream_new_chat( "Failed to clear AI responding state for thread %s", chat_id ) + _try_persist_and_delete_sandbox(chat_id, stream_result.sandbox_files) + async def stream_resume_chat( chat_id: int, @@ -1393,6 +1430,7 @@ async def stream_resume_chat( thread_visibility: ChatVisibility | None = None, ) -> AsyncGenerator[str, None]: streaming_service = VercelStreamingService() + stream_result = StreamResult() try: if user_id: @@ -1485,7 +1523,6 @@ async def stream_resume_chat( yield streaming_service.format_message_start() yield streaming_service.format_start_step() - stream_result = StreamResult() async for sse in _stream_agent_events( agent=agent, config=config, @@ -1528,3 +1565,5 @@ async def stream_resume_chat( logging.getLogger(__name__).warning( "Failed to clear AI responding state for thread %s", chat_id ) + + _try_persist_and_delete_sandbox(chat_id, stream_result.sandbox_files)