feat: implement local file persistence for Daytona sandboxes

- Added functionality to persist sandbox files locally before deletion.
- Introduced methods for retrieving and deleting locally stored sandbox files.
- Updated routes to handle local file downloads and background deletion of sandboxes.
- Enhanced streaming tasks to support sandbox file management.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-02-25 12:25:59 -08:00
parent 80be9f26c4
commit 54dd6f904c
6 changed files with 208 additions and 5 deletions

View file

@ -173,4 +173,6 @@ LANGSMITH_PROJECT=surfsense
DAYTONA_SANDBOX_ENABLED=TRUE
DAYTONA_API_KEY=dtn_asdasfasfafas
DAYTONA_API_URL=https://app.daytona.io/api
DAYTONA_TARGET=us
DAYTONA_TARGET=us
# Directory for locally-persisted sandbox files (after sandbox deletion)
SANDBOX_FILES_DIR=sandbox_files

View file

@ -6,6 +6,7 @@ __pycache__/
.flashrank_cache
surf_new_backend.egg-info/
podcasts/
sandbox_files/
temp_audio/
celerybeat-schedule*
celerybeat-schedule.*

View file

@ -4,6 +4,9 @@ Daytona sandbox provider for SurfSense deep agent.
Manages the lifecycle of sandboxed code execution environments.
Each conversation thread gets its own isolated sandbox instance
via the Daytona cloud API, identified by labels.
Files created during a session are persisted to local storage before
the sandbox is deleted so they remain downloadable after cleanup.
"""
from __future__ import annotations
@ -11,6 +14,8 @@ from __future__ import annotations
import asyncio
import logging
import os
import shutil
from pathlib import Path
from daytona import (
CreateSandboxFromSnapshotParams,
@ -18,6 +23,7 @@ from daytona import (
DaytonaConfig,
SandboxState,
)
from daytona.common.errors import DaytonaError
from deepagents.backends.protocol import ExecuteResponse
from langchain_daytona import DaytonaSandbox
@ -140,6 +146,10 @@ async def delete_sandbox(thread_id: int | str) -> None:
labels = {THREAD_LABEL_KEY: str(thread_id)}
try:
sandbox = client.find_one(labels=labels)
except DaytonaError:
logger.debug("No sandbox to delete for thread %s (already removed)", thread_id)
return
try:
client.delete(sandbox)
logger.info("Sandbox deleted: %s", sandbox.id)
except Exception:
@ -150,3 +160,104 @@ async def delete_sandbox(thread_id: int | str) -> None:
)
await asyncio.to_thread(_delete)
# ---------------------------------------------------------------------------
# Local file persistence
# ---------------------------------------------------------------------------
def _get_sandbox_files_dir() -> Path:
return Path(os.environ.get("SANDBOX_FILES_DIR", "sandbox_files"))
def _local_path_for(thread_id: int | str, sandbox_path: str) -> Path:
"""Map a sandbox-internal absolute path to a local filesystem path."""
relative = sandbox_path.lstrip("/")
return _get_sandbox_files_dir() / str(thread_id) / relative
def get_local_sandbox_file(thread_id: int | str, sandbox_path: str) -> bytes | None:
"""Read a previously-persisted sandbox file from local storage.
Returns the file bytes, or *None* if the file does not exist locally.
"""
local = _local_path_for(thread_id, sandbox_path)
if local.is_file():
return local.read_bytes()
return None
def delete_local_sandbox_files(thread_id: int | str) -> None:
"""Remove all locally-persisted sandbox files for a thread."""
thread_dir = _get_sandbox_files_dir() / str(thread_id)
if thread_dir.is_dir():
shutil.rmtree(thread_dir, ignore_errors=True)
logger.info("Deleted local sandbox files for thread %s", thread_id)
async def persist_and_delete_sandbox(
thread_id: int | str,
sandbox_file_paths: list[str],
) -> None:
"""Download sandbox files to local storage, then delete the sandbox.
Each file in *sandbox_file_paths* is downloaded from the Daytona
sandbox and saved under ``{SANDBOX_FILES_DIR}/{thread_id}/``.
Per-file errors are logged but do **not** prevent the sandbox from
being deleted freeing Daytona storage is the priority.
"""
def _persist_and_delete() -> None:
client = _get_client()
labels = {THREAD_LABEL_KEY: str(thread_id)}
try:
sandbox = client.find_one(labels=labels)
except Exception:
logger.info(
"No sandbox found for thread %s — nothing to persist", thread_id
)
return
# Ensure the sandbox is running so we can download files
if sandbox.state != SandboxState.STARTED:
try:
sandbox.start(timeout=60)
except Exception:
logger.warning(
"Could not start sandbox %s for file download — deleting anyway",
sandbox.id,
exc_info=True,
)
try:
client.delete(sandbox)
except Exception:
pass
return
for path in sandbox_file_paths:
try:
content: bytes = sandbox.fs.download_file(path)
local = _local_path_for(thread_id, path)
local.parent.mkdir(parents=True, exist_ok=True)
local.write_bytes(content)
logger.info("Persisted sandbox file %s%s", path, local)
except Exception:
logger.warning(
"Failed to persist sandbox file %s for thread %s",
path,
thread_id,
exc_info=True,
)
try:
client.delete(sandbox)
logger.info("Sandbox deleted after file persistence: %s", sandbox.id)
except Exception:
logger.warning(
"Failed to delete sandbox %s after persistence",
sandbox.id,
exc_info=True,
)
await asyncio.to_thread(_persist_and_delete)

View file

@ -52,9 +52,42 @@ from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat
from app.users import current_active_user
from app.utils.rbac import check_permission
import asyncio
import logging
_logger = logging.getLogger(__name__)
router = APIRouter()
def _try_delete_sandbox(thread_id: int) -> None:
"""Fire-and-forget sandbox + local file deletion so the HTTP response isn't blocked."""
from app.agents.new_chat.sandbox import (
delete_local_sandbox_files,
delete_sandbox,
is_sandbox_enabled,
)
if not is_sandbox_enabled():
return
async def _bg() -> None:
try:
await delete_sandbox(thread_id)
except Exception:
_logger.warning("Background sandbox delete failed for thread %s", thread_id, exc_info=True)
try:
delete_local_sandbox_files(thread_id)
except Exception:
_logger.warning("Local sandbox file cleanup failed for thread %s", thread_id, exc_info=True)
try:
loop = asyncio.get_running_loop()
loop.create_task(_bg())
except RuntimeError:
pass
async def check_thread_access(
session: AsyncSession,
thread: NewChatThread,
@ -648,6 +681,9 @@ async def delete_thread(
await session.delete(db_thread)
await session.commit()
_try_delete_sandbox(thread_id)
return {"message": "Thread deleted successfully"}
except HTTPException:

View file

@ -71,9 +71,23 @@ async def download_sandbox_file(
"You don't have permission to access files in this thread",
)
from app.agents.new_chat.sandbox import get_local_sandbox_file
# Prefer locally-persisted copy (sandbox may already be deleted)
local_content = get_local_sandbox_file(thread_id, path)
if local_content is not None:
filename = path.rsplit("/", 1)[-1] if "/" in path else path
media_type = _guess_media_type(filename)
return Response(
content=local_content,
media_type=media_type,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
# Fall back to live sandbox download
try:
sandbox = await get_or_create_sandbox(thread_id)
raw_sandbox = sandbox._sandbox
raw_sandbox = sandbox._sandbox # noqa: SLF001
content: bytes = await asyncio.to_thread(raw_sandbox.fs.download_file, path)
except Exception as exc:
logger.warning("Sandbox file download failed for %s: %s", path, exc)

View file

@ -9,11 +9,12 @@ Supports loading LLM configurations from:
- NewLLMConfig database table (positive IDs for user-created configs with prompt settings)
"""
import asyncio
import json
import logging
import re
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Any
from uuid import UUID
@ -193,6 +194,7 @@ class StreamResult:
accumulated_text: str = ""
is_interrupted: bool = False
interrupt_value: dict[str, Any] | None = None
sandbox_files: list[str] = field(default_factory=list)
async def _stream_agent_events(
@ -874,6 +876,12 @@ async def _stream_agent_events(
om = re.search(r"\nOutput:\n([\s\S]*)", raw_text)
output_text = om.group(1) if om else ""
thread_id_str = config.get("configurable", {}).get("thread_id", "")
for sf_match in re.finditer(r"^SANDBOX_FILE:\s*(.+)$", output_text, re.MULTILINE):
fpath = sf_match.group(1).strip()
if fpath and fpath not in result.sandbox_files:
result.sandbox_files.append(fpath)
yield streaming_service.format_tool_output_available(
tool_call_id,
{
@ -950,6 +958,33 @@ async def _stream_agent_events(
yield streaming_service.format_interrupt_request(result.interrupt_value)
def _try_persist_and_delete_sandbox(
thread_id: int,
sandbox_files: list[str],
) -> None:
"""Fire-and-forget: persist sandbox files locally then delete the sandbox."""
from app.agents.new_chat.sandbox import is_sandbox_enabled, persist_and_delete_sandbox
if not is_sandbox_enabled():
return
async def _run() -> None:
try:
await persist_and_delete_sandbox(thread_id, sandbox_files)
except Exception:
logging.getLogger(__name__).warning(
"persist_and_delete_sandbox failed for thread %s",
thread_id,
exc_info=True,
)
try:
loop = asyncio.get_running_loop()
loop.create_task(_run())
except RuntimeError:
pass
async def stream_new_chat(
user_query: str,
search_space_id: int,
@ -986,6 +1021,7 @@ async def stream_new_chat(
str: SSE formatted response strings
"""
streaming_service = VercelStreamingService()
stream_result = StreamResult()
try:
# Mark AI as responding to this user for live collaboration
@ -1268,7 +1304,6 @@ async def stream_new_chat(
items=initial_items,
)
stream_result = StreamResult()
async for sse in _stream_agent_events(
agent=agent,
config=config,
@ -1382,6 +1417,8 @@ async def stream_new_chat(
"Failed to clear AI responding state for thread %s", chat_id
)
_try_persist_and_delete_sandbox(chat_id, stream_result.sandbox_files)
async def stream_resume_chat(
chat_id: int,
@ -1393,6 +1430,7 @@ async def stream_resume_chat(
thread_visibility: ChatVisibility | None = None,
) -> AsyncGenerator[str, None]:
streaming_service = VercelStreamingService()
stream_result = StreamResult()
try:
if user_id:
@ -1485,7 +1523,6 @@ async def stream_resume_chat(
yield streaming_service.format_message_start()
yield streaming_service.format_start_step()
stream_result = StreamResult()
async for sse in _stream_agent_events(
agent=agent,
config=config,
@ -1528,3 +1565,5 @@ async def stream_resume_chat(
logging.getLogger(__name__).warning(
"Failed to clear AI responding state for thread %s", chat_id
)
_try_persist_and_delete_sandbox(chat_id, stream_result.sandbox_files)