mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-20 21:18:13 +02:00
Merge remote-tracking branch 'upstream/dev' into refactor/persistent-memory
This commit is contained in:
commit
ab3cb0e1c5
63 changed files with 6320 additions and 4823 deletions
|
|
@ -25,7 +25,7 @@ from sqlalchemy import (
|
|||
)
|
||||
from sqlalchemy.dialects.postgresql import JSONB, UUID
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, declared_attr, relationship
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, backref, declared_attr, relationship
|
||||
|
||||
from app.config import config
|
||||
|
||||
|
|
@ -1086,7 +1086,9 @@ class DocumentVersion(BaseModel, TimestampMixin):
|
|||
content_hash = Column(String, nullable=False)
|
||||
title = Column(String, nullable=True)
|
||||
|
||||
document = relationship("Document", backref="versions")
|
||||
document = relationship(
|
||||
"Document", backref=backref("versions", passive_deletes=True)
|
||||
)
|
||||
|
||||
|
||||
class Chunk(BaseModel, TimestampMixin):
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ class ConnectorDocument(BaseModel):
|
|||
metadata: dict = {}
|
||||
connector_id: int | None = None
|
||||
created_by_id: str
|
||||
folder_id: int | None = None
|
||||
|
||||
@field_validator("title", "source_markdown", "unique_id", "created_by_id")
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -268,6 +268,8 @@ class IndexingPipelineService:
|
|||
):
|
||||
existing.status = DocumentStatus.pending()
|
||||
existing.updated_at = datetime.now(UTC)
|
||||
if connector_doc.folder_id is not None:
|
||||
existing.folder_id = connector_doc.folder_id
|
||||
documents.append(existing)
|
||||
log_document_requeued(ctx)
|
||||
continue
|
||||
|
|
@ -294,6 +296,8 @@ class IndexingPipelineService:
|
|||
existing.document_metadata = connector_doc.metadata
|
||||
existing.updated_at = datetime.now(UTC)
|
||||
existing.status = DocumentStatus.pending()
|
||||
if connector_doc.folder_id is not None:
|
||||
existing.folder_id = connector_doc.folder_id
|
||||
documents.append(existing)
|
||||
log_document_updated(ctx)
|
||||
continue
|
||||
|
|
@ -317,6 +321,7 @@ class IndexingPipelineService:
|
|||
created_by_id=connector_doc.created_by_id,
|
||||
updated_at=datetime.now(UTC),
|
||||
status=DocumentStatus.pending(),
|
||||
folder_id=connector_doc.folder_id,
|
||||
)
|
||||
self.session.add(document)
|
||||
documents.append(document)
|
||||
|
|
|
|||
|
|
@ -1385,45 +1385,48 @@ async def restore_document_version(
|
|||
}
|
||||
|
||||
|
||||
# ===== Local folder indexing endpoints =====
|
||||
# ===== Upload-based local folder indexing endpoints =====
|
||||
# These work for ALL deployment modes (cloud, self-hosted remote, self-hosted local).
|
||||
# The desktop app reads files locally and uploads them here.
|
||||
|
||||
|
||||
class FolderIndexRequest(PydanticBaseModel):
|
||||
folder_path: str
|
||||
class FolderMtimeCheckFile(PydanticBaseModel):
|
||||
relative_path: str
|
||||
mtime: float
|
||||
|
||||
|
||||
class FolderMtimeCheckRequest(PydanticBaseModel):
|
||||
folder_name: str
|
||||
search_space_id: int
|
||||
exclude_patterns: list[str] | None = None
|
||||
file_extensions: list[str] | None = None
|
||||
root_folder_id: int | None = None
|
||||
enable_summary: bool = False
|
||||
files: list[FolderMtimeCheckFile]
|
||||
|
||||
|
||||
class FolderIndexFilesRequest(PydanticBaseModel):
|
||||
folder_path: str
|
||||
class FolderUnlinkRequest(PydanticBaseModel):
|
||||
folder_name: str
|
||||
search_space_id: int
|
||||
target_file_paths: list[str]
|
||||
root_folder_id: int | None = None
|
||||
enable_summary: bool = False
|
||||
relative_paths: list[str]
|
||||
|
||||
|
||||
@router.post("/documents/folder-index")
|
||||
async def folder_index(
|
||||
request: FolderIndexRequest,
|
||||
class FolderSyncFinalizeRequest(PydanticBaseModel):
|
||||
folder_name: str
|
||||
search_space_id: int
|
||||
root_folder_id: int | None = None
|
||||
all_relative_paths: list[str]
|
||||
|
||||
|
||||
@router.post("/documents/folder-mtime-check")
|
||||
async def folder_mtime_check(
|
||||
request: FolderMtimeCheckRequest,
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
user: User = Depends(current_active_user),
|
||||
):
|
||||
"""Full-scan index of a local folder. Creates the root Folder row synchronously
|
||||
and dispatches the heavy indexing work to a Celery task.
|
||||
Returns the root_folder_id so the desktop can persist it.
|
||||
"""
|
||||
from app.config import config as app_config
|
||||
"""Pre-upload optimization: check which files need uploading based on mtime.
|
||||
|
||||
if not app_config.is_self_hosted():
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Local folder indexing is only available in self-hosted mode",
|
||||
)
|
||||
Returns the subset of relative paths where the file is new or has a
|
||||
different mtime, so the client can skip reading/uploading unchanged files.
|
||||
"""
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
|
||||
await check_permission(
|
||||
session,
|
||||
|
|
@ -1433,113 +1436,309 @@ async def folder_index(
|
|||
"You don't have permission to create documents in this search space",
|
||||
)
|
||||
|
||||
watched_metadata = {
|
||||
"watched": True,
|
||||
"folder_path": request.folder_path,
|
||||
"exclude_patterns": request.exclude_patterns,
|
||||
"file_extensions": request.file_extensions,
|
||||
}
|
||||
uid_hashes = {}
|
||||
for f in request.files:
|
||||
uid = f"{request.folder_name}:{f.relative_path}"
|
||||
uid_hash = compute_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE.value, uid, request.search_space_id
|
||||
)
|
||||
uid_hashes[uid_hash] = f
|
||||
|
||||
root_folder_id = request.root_folder_id
|
||||
if root_folder_id:
|
||||
existing = (
|
||||
await session.execute(select(Folder).where(Folder.id == root_folder_id))
|
||||
).scalar_one_or_none()
|
||||
if not existing:
|
||||
root_folder_id = None
|
||||
else:
|
||||
existing.folder_metadata = watched_metadata
|
||||
await session.commit()
|
||||
existing_docs = (
|
||||
(
|
||||
await session.execute(
|
||||
select(Document).where(
|
||||
Document.unique_identifier_hash.in_(list(uid_hashes.keys())),
|
||||
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
|
||||
)
|
||||
)
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
|
||||
existing_by_hash = {doc.unique_identifier_hash: doc for doc in existing_docs}
|
||||
|
||||
mtime_tolerance = 1.0
|
||||
files_to_upload: list[str] = []
|
||||
|
||||
for uid_hash, file_info in uid_hashes.items():
|
||||
doc = existing_by_hash.get(uid_hash)
|
||||
if doc is None:
|
||||
files_to_upload.append(file_info.relative_path)
|
||||
continue
|
||||
|
||||
stored_mtime = (doc.document_metadata or {}).get("mtime")
|
||||
if stored_mtime is None:
|
||||
files_to_upload.append(file_info.relative_path)
|
||||
continue
|
||||
|
||||
if abs(file_info.mtime - stored_mtime) >= mtime_tolerance:
|
||||
files_to_upload.append(file_info.relative_path)
|
||||
|
||||
return {"files_to_upload": files_to_upload}
|
||||
|
||||
|
||||
@router.post("/documents/folder-upload")
|
||||
async def folder_upload(
|
||||
files: list[UploadFile],
|
||||
folder_name: str = Form(...),
|
||||
search_space_id: int = Form(...),
|
||||
relative_paths: str = Form(...),
|
||||
root_folder_id: int | None = Form(None),
|
||||
enable_summary: bool = Form(False),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
user: User = Depends(current_active_user),
|
||||
):
|
||||
"""Upload files from the desktop app for folder indexing.
|
||||
|
||||
Files are written to temp storage and dispatched to a Celery task.
|
||||
Works for all deployment modes (no is_self_hosted guard).
|
||||
"""
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
await check_permission(
|
||||
session,
|
||||
user,
|
||||
search_space_id,
|
||||
Permission.DOCUMENTS_CREATE.value,
|
||||
"You don't have permission to create documents in this search space",
|
||||
)
|
||||
|
||||
if not files:
|
||||
raise HTTPException(status_code=400, detail="No files provided")
|
||||
|
||||
try:
|
||||
rel_paths: list[str] = json.loads(relative_paths)
|
||||
except (json.JSONDecodeError, TypeError) as e:
|
||||
raise HTTPException(
|
||||
status_code=400, detail=f"Invalid relative_paths JSON: {e}"
|
||||
) from e
|
||||
|
||||
if len(rel_paths) != len(files):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Mismatch: {len(files)} files but {len(rel_paths)} relative_paths",
|
||||
)
|
||||
|
||||
for file in files:
|
||||
file_size = file.size or 0
|
||||
if file_size > MAX_FILE_SIZE_BYTES:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"File '{file.filename}' ({file_size / (1024 * 1024):.1f} MB) "
|
||||
f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.",
|
||||
)
|
||||
|
||||
if not root_folder_id:
|
||||
root_folder = Folder(
|
||||
name=request.folder_name,
|
||||
search_space_id=request.search_space_id,
|
||||
created_by_id=str(user.id),
|
||||
position="a0",
|
||||
folder_metadata=watched_metadata,
|
||||
)
|
||||
session.add(root_folder)
|
||||
await session.flush()
|
||||
root_folder_id = root_folder.id
|
||||
watched_metadata = {
|
||||
"watched": True,
|
||||
"folder_path": folder_name,
|
||||
}
|
||||
existing_root = (
|
||||
await session.execute(
|
||||
select(Folder).where(
|
||||
Folder.name == folder_name,
|
||||
Folder.parent_id.is_(None),
|
||||
Folder.search_space_id == search_space_id,
|
||||
)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing_root:
|
||||
root_folder_id = existing_root.id
|
||||
existing_root.folder_metadata = watched_metadata
|
||||
else:
|
||||
root_folder = Folder(
|
||||
name=folder_name,
|
||||
search_space_id=search_space_id,
|
||||
created_by_id=str(user.id),
|
||||
position="a0",
|
||||
folder_metadata=watched_metadata,
|
||||
)
|
||||
session.add(root_folder)
|
||||
await session.flush()
|
||||
root_folder_id = root_folder.id
|
||||
|
||||
await session.commit()
|
||||
|
||||
from app.tasks.celery_tasks.document_tasks import index_local_folder_task
|
||||
async def _read_and_save(file: UploadFile, idx: int) -> dict:
|
||||
content = await file.read()
|
||||
filename = file.filename or rel_paths[idx].split("/")[-1]
|
||||
|
||||
index_local_folder_task.delay(
|
||||
search_space_id=request.search_space_id,
|
||||
def _write_temp() -> str:
|
||||
with tempfile.NamedTemporaryFile(
|
||||
delete=False, suffix=os.path.splitext(filename)[1]
|
||||
) as tmp:
|
||||
tmp.write(content)
|
||||
return tmp.name
|
||||
|
||||
temp_path = await asyncio.to_thread(_write_temp)
|
||||
return {
|
||||
"temp_path": temp_path,
|
||||
"relative_path": rel_paths[idx],
|
||||
"filename": filename,
|
||||
}
|
||||
|
||||
file_mappings = await asyncio.gather(
|
||||
*(_read_and_save(f, i) for i, f in enumerate(files))
|
||||
)
|
||||
|
||||
from app.tasks.celery_tasks.document_tasks import (
|
||||
index_uploaded_folder_files_task,
|
||||
)
|
||||
|
||||
index_uploaded_folder_files_task.delay(
|
||||
search_space_id=search_space_id,
|
||||
user_id=str(user.id),
|
||||
folder_path=request.folder_path,
|
||||
folder_name=request.folder_name,
|
||||
exclude_patterns=request.exclude_patterns,
|
||||
file_extensions=request.file_extensions,
|
||||
folder_name=folder_name,
|
||||
root_folder_id=root_folder_id,
|
||||
enable_summary=request.enable_summary,
|
||||
enable_summary=enable_summary,
|
||||
file_mappings=list(file_mappings),
|
||||
)
|
||||
|
||||
return {
|
||||
"message": "Folder indexing started",
|
||||
"message": f"Folder upload started for {len(files)} file(s)",
|
||||
"status": "processing",
|
||||
"root_folder_id": root_folder_id,
|
||||
"file_count": len(files),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/documents/folder-index-files")
|
||||
async def folder_index_files(
|
||||
request: FolderIndexFilesRequest,
|
||||
@router.post("/documents/folder-unlink")
|
||||
async def folder_unlink(
|
||||
request: FolderUnlinkRequest,
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
user: User = Depends(current_active_user),
|
||||
):
|
||||
"""Index multiple files within a watched folder (batched chokidar trigger).
|
||||
Validates that all target_file_paths are under folder_path.
|
||||
Dispatches a single Celery task that processes them in parallel.
|
||||
"""Handle file deletion events from the desktop watcher.
|
||||
|
||||
For each relative path, find the matching document and delete it.
|
||||
"""
|
||||
from app.config import config as app_config
|
||||
|
||||
if not app_config.is_self_hosted():
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Local folder indexing is only available in self-hosted mode",
|
||||
)
|
||||
|
||||
if not request.target_file_paths:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="target_file_paths must not be empty"
|
||||
)
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
from app.tasks.connector_indexers.local_folder_indexer import (
|
||||
_cleanup_empty_folder_chain,
|
||||
)
|
||||
|
||||
await check_permission(
|
||||
session,
|
||||
user,
|
||||
request.search_space_id,
|
||||
Permission.DOCUMENTS_CREATE.value,
|
||||
"You don't have permission to create documents in this search space",
|
||||
Permission.DOCUMENTS_DELETE.value,
|
||||
"You don't have permission to delete documents in this search space",
|
||||
)
|
||||
|
||||
from pathlib import Path
|
||||
deleted_count = 0
|
||||
|
||||
for fp in request.target_file_paths:
|
||||
try:
|
||||
Path(fp).relative_to(request.folder_path)
|
||||
except ValueError as err:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"target_file_path {fp} must be inside folder_path",
|
||||
) from err
|
||||
for rel_path in request.relative_paths:
|
||||
unique_id = f"{request.folder_name}:{rel_path}"
|
||||
uid_hash = compute_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE.value,
|
||||
unique_id,
|
||||
request.search_space_id,
|
||||
)
|
||||
|
||||
from app.tasks.celery_tasks.document_tasks import index_local_folder_task
|
||||
existing = (
|
||||
await session.execute(
|
||||
select(Document).where(Document.unique_identifier_hash == uid_hash)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
|
||||
index_local_folder_task.delay(
|
||||
search_space_id=request.search_space_id,
|
||||
user_id=str(user.id),
|
||||
folder_path=request.folder_path,
|
||||
folder_name=request.folder_name,
|
||||
target_file_paths=request.target_file_paths,
|
||||
root_folder_id=request.root_folder_id,
|
||||
enable_summary=request.enable_summary,
|
||||
if existing:
|
||||
deleted_folder_id = existing.folder_id
|
||||
await session.delete(existing)
|
||||
await session.flush()
|
||||
|
||||
if deleted_folder_id and request.root_folder_id:
|
||||
await _cleanup_empty_folder_chain(
|
||||
session, deleted_folder_id, request.root_folder_id
|
||||
)
|
||||
deleted_count += 1
|
||||
|
||||
await session.commit()
|
||||
return {"deleted_count": deleted_count}
|
||||
|
||||
|
||||
@router.post("/documents/folder-sync-finalize")
|
||||
async def folder_sync_finalize(
|
||||
request: FolderSyncFinalizeRequest,
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
user: User = Depends(current_active_user),
|
||||
):
|
||||
"""Finalize a full folder scan by deleting orphaned documents.
|
||||
|
||||
The client sends the complete list of relative paths currently in the
|
||||
folder. Any document in the DB for this folder that is NOT in the list
|
||||
gets deleted.
|
||||
"""
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
from app.services.folder_service import get_folder_subtree_ids
|
||||
from app.tasks.connector_indexers.local_folder_indexer import (
|
||||
_cleanup_empty_folders,
|
||||
)
|
||||
|
||||
return {
|
||||
"message": f"Batch indexing started for {len(request.target_file_paths)} file(s)",
|
||||
"status": "processing",
|
||||
"file_count": len(request.target_file_paths),
|
||||
}
|
||||
await check_permission(
|
||||
session,
|
||||
user,
|
||||
request.search_space_id,
|
||||
Permission.DOCUMENTS_DELETE.value,
|
||||
"You don't have permission to delete documents in this search space",
|
||||
)
|
||||
|
||||
if not request.root_folder_id:
|
||||
return {"deleted_count": 0}
|
||||
|
||||
subtree_ids = await get_folder_subtree_ids(session, request.root_folder_id)
|
||||
|
||||
seen_hashes: set[str] = set()
|
||||
for rel_path in request.all_relative_paths:
|
||||
unique_id = f"{request.folder_name}:{rel_path}"
|
||||
uid_hash = compute_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE.value,
|
||||
unique_id,
|
||||
request.search_space_id,
|
||||
)
|
||||
seen_hashes.add(uid_hash)
|
||||
|
||||
all_folder_docs = (
|
||||
(
|
||||
await session.execute(
|
||||
select(Document).where(
|
||||
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
|
||||
Document.search_space_id == request.search_space_id,
|
||||
Document.folder_id.in_(subtree_ids),
|
||||
)
|
||||
)
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
|
||||
deleted_count = 0
|
||||
for doc in all_folder_docs:
|
||||
if doc.unique_identifier_hash not in seen_hashes:
|
||||
await session.delete(doc)
|
||||
deleted_count += 1
|
||||
|
||||
await session.flush()
|
||||
|
||||
existing_dirs: set[str] = set()
|
||||
for rel_path in request.all_relative_paths:
|
||||
parent = str(os.path.dirname(rel_path))
|
||||
if parent and parent != ".":
|
||||
existing_dirs.add(parent)
|
||||
|
||||
folder_mapping: dict[str, int] = {"": request.root_folder_id}
|
||||
|
||||
await _cleanup_empty_folders(
|
||||
session,
|
||||
request.root_folder_id,
|
||||
request.search_space_id,
|
||||
existing_dirs,
|
||||
folder_mapping,
|
||||
subtree_ids=subtree_ids,
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
return {"deleted_count": deleted_count}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,10 @@ from app.config import config
|
|||
from app.services.notification_service import NotificationService
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.tasks.celery_tasks import get_celery_session_maker
|
||||
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
|
||||
from app.tasks.connector_indexers.local_folder_indexer import (
|
||||
index_local_folder,
|
||||
index_uploaded_files,
|
||||
)
|
||||
from app.tasks.document_processors import (
|
||||
add_extension_received_document,
|
||||
add_youtube_video_document,
|
||||
|
|
@ -1411,3 +1414,132 @@ async def _index_local_folder_async(
|
|||
heartbeat_task.cancel()
|
||||
if notification_id is not None:
|
||||
_stop_heartbeat(notification_id)
|
||||
|
||||
|
||||
# ===== Upload-based folder indexing task =====
|
||||
|
||||
|
||||
@celery_app.task(name="index_uploaded_folder_files", bind=True)
|
||||
def index_uploaded_folder_files_task(
|
||||
self,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
folder_name: str,
|
||||
root_folder_id: int,
|
||||
enable_summary: bool,
|
||||
file_mappings: list[dict],
|
||||
):
|
||||
"""Celery task to index files uploaded from the desktop app."""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
_index_uploaded_folder_files_async(
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
folder_name=folder_name,
|
||||
root_folder_id=root_folder_id,
|
||||
enable_summary=enable_summary,
|
||||
file_mappings=file_mappings,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _index_uploaded_folder_files_async(
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
folder_name: str,
|
||||
root_folder_id: int,
|
||||
enable_summary: bool,
|
||||
file_mappings: list[dict],
|
||||
):
|
||||
"""Run upload-based folder indexing with notification + heartbeat."""
|
||||
file_count = len(file_mappings)
|
||||
doc_name = f"{folder_name} ({file_count} file{'s' if file_count != 1 else ''})"
|
||||
|
||||
notification = None
|
||||
notification_id: int | None = None
|
||||
heartbeat_task = None
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
try:
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="LOCAL_FOLDER_FILE",
|
||||
document_name=doc_name,
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
)
|
||||
notification_id = notification.id
|
||||
_start_heartbeat(notification_id)
|
||||
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification_id))
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to create notification for uploaded folder indexing",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
async def _heartbeat_progress(completed_count: int) -> None:
|
||||
if notification:
|
||||
with contextlib.suppress(Exception):
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session=session,
|
||||
notification=notification,
|
||||
stage="indexing",
|
||||
stage_message=f"Syncing files ({completed_count}/{file_count})",
|
||||
)
|
||||
|
||||
try:
|
||||
_indexed, _failed, err = await index_uploaded_files(
|
||||
session=session,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
folder_name=folder_name,
|
||||
root_folder_id=root_folder_id,
|
||||
enable_summary=enable_summary,
|
||||
file_mappings=file_mappings,
|
||||
on_heartbeat_callback=_heartbeat_progress,
|
||||
)
|
||||
|
||||
if notification:
|
||||
try:
|
||||
await session.refresh(notification)
|
||||
if err:
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message=err,
|
||||
)
|
||||
else:
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to update notification after uploaded folder indexing",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Uploaded folder indexing failed: {e}")
|
||||
if notification:
|
||||
try:
|
||||
await session.refresh(notification)
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message=str(e)[:200],
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
raise
|
||||
finally:
|
||||
if heartbeat_task:
|
||||
heartbeat_task.cancel()
|
||||
if notification_id is not None:
|
||||
_stop_heartbeat(notification_id)
|
||||
|
|
|
|||
|
|
@ -14,13 +14,14 @@ no connector row is read.
|
|||
"""
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import os
|
||||
from collections.abc import Awaitable, Callable
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.db import (
|
||||
|
|
@ -178,6 +179,22 @@ def _content_hash(content: str, search_space_id: int) -> str:
|
|||
return hashlib.sha256(f"{search_space_id}:{content}".encode()).hexdigest()
|
||||
|
||||
|
||||
def _compute_raw_file_hash(file_path: str) -> str:
|
||||
"""SHA-256 hash of the raw file bytes.
|
||||
|
||||
Much cheaper than ETL/OCR extraction -- only performs sequential I/O.
|
||||
Used as a pre-filter to skip expensive content extraction when the
|
||||
underlying file hasn't changed at all.
|
||||
"""
|
||||
import hashlib
|
||||
|
||||
h = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
async def _compute_file_content_hash(
|
||||
file_path: str,
|
||||
filename: str,
|
||||
|
|
@ -328,6 +345,27 @@ async def _resolve_folder_for_file(
|
|||
return current_parent_id
|
||||
|
||||
|
||||
async def _set_indexing_flag(session: AsyncSession, folder_id: int) -> None:
|
||||
folder = await session.get(Folder, folder_id)
|
||||
if folder:
|
||||
meta = dict(folder.folder_metadata or {})
|
||||
meta["indexing_in_progress"] = True
|
||||
folder.folder_metadata = meta
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def _clear_indexing_flag(session: AsyncSession, folder_id: int) -> None:
|
||||
try:
|
||||
folder = await session.get(Folder, folder_id)
|
||||
if folder:
|
||||
meta = dict(folder.folder_metadata or {})
|
||||
meta.pop("indexing_in_progress", None)
|
||||
folder.folder_metadata = meta
|
||||
await session.commit()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def _cleanup_empty_folder_chain(
|
||||
session: AsyncSession,
|
||||
folder_id: int,
|
||||
|
|
@ -371,24 +409,21 @@ async def _cleanup_empty_folders(
|
|||
search_space_id: int,
|
||||
existing_dirs_on_disk: set[str],
|
||||
folder_mapping: dict[str, int],
|
||||
subtree_ids: list[int] | None = None,
|
||||
) -> None:
|
||||
"""Delete Folder rows that are empty (no docs, no children) and no longer on disk."""
|
||||
from sqlalchemy import delete as sa_delete
|
||||
|
||||
id_to_rel: dict[int, str] = {fid: rel for rel, fid in folder_mapping.items() if rel}
|
||||
|
||||
all_folders = (
|
||||
(
|
||||
await session.execute(
|
||||
select(Folder).where(
|
||||
Folder.search_space_id == search_space_id,
|
||||
Folder.id != root_folder_id,
|
||||
)
|
||||
)
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
query = select(Folder).where(
|
||||
Folder.search_space_id == search_space_id,
|
||||
Folder.id != root_folder_id,
|
||||
)
|
||||
if subtree_ids is not None:
|
||||
query = query.where(Folder.id.in_(subtree_ids))
|
||||
|
||||
all_folders = (await session.execute(query)).scalars().all()
|
||||
|
||||
candidates: list[Folder] = []
|
||||
for folder in all_folders:
|
||||
|
|
@ -518,44 +553,50 @@ async def index_local_folder(
|
|||
# BATCH MODE (1..N files)
|
||||
# ====================================================================
|
||||
if target_file_paths:
|
||||
if len(target_file_paths) == 1:
|
||||
indexed, skipped, err = await _index_single_file(
|
||||
session=session,
|
||||
if root_folder_id:
|
||||
await _set_indexing_flag(session, root_folder_id)
|
||||
try:
|
||||
if len(target_file_paths) == 1:
|
||||
indexed, skipped, err = await _index_single_file(
|
||||
session=session,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
folder_path=folder_path,
|
||||
folder_name=folder_name,
|
||||
target_file_path=target_file_paths[0],
|
||||
enable_summary=enable_summary,
|
||||
root_folder_id=root_folder_id,
|
||||
task_logger=task_logger,
|
||||
log_entry=log_entry,
|
||||
)
|
||||
return indexed, skipped, root_folder_id, err
|
||||
|
||||
indexed, failed, err = await _index_batch_files(
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
folder_path=folder_path,
|
||||
folder_name=folder_name,
|
||||
target_file_path=target_file_paths[0],
|
||||
target_file_paths=target_file_paths,
|
||||
enable_summary=enable_summary,
|
||||
root_folder_id=root_folder_id,
|
||||
task_logger=task_logger,
|
||||
log_entry=log_entry,
|
||||
on_progress_callback=on_heartbeat_callback,
|
||||
)
|
||||
return indexed, skipped, root_folder_id, err
|
||||
|
||||
indexed, failed, err = await _index_batch_files(
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
folder_path=folder_path,
|
||||
folder_name=folder_name,
|
||||
target_file_paths=target_file_paths,
|
||||
enable_summary=enable_summary,
|
||||
root_folder_id=root_folder_id,
|
||||
on_progress_callback=on_heartbeat_callback,
|
||||
)
|
||||
if err:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Batch indexing: {indexed} indexed, {failed} failed",
|
||||
{"indexed": indexed, "failed": failed},
|
||||
)
|
||||
else:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Batch indexing complete: {indexed} indexed",
|
||||
{"indexed": indexed, "failed": failed},
|
||||
)
|
||||
return indexed, failed, root_folder_id, err
|
||||
if err:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Batch indexing: {indexed} indexed, {failed} failed",
|
||||
{"indexed": indexed, "failed": failed},
|
||||
)
|
||||
else:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Batch indexing complete: {indexed} indexed",
|
||||
{"indexed": indexed, "failed": failed},
|
||||
)
|
||||
return indexed, failed, root_folder_id, err
|
||||
finally:
|
||||
if root_folder_id:
|
||||
await _clear_indexing_flag(session, root_folder_id)
|
||||
|
||||
# ====================================================================
|
||||
# FULL-SCAN MODE
|
||||
|
|
@ -575,6 +616,7 @@ async def index_local_folder(
|
|||
exclude_patterns=exclude_patterns,
|
||||
)
|
||||
await session.flush()
|
||||
await _set_indexing_flag(session, root_folder_id)
|
||||
|
||||
try:
|
||||
files = scan_folder(folder_path, file_extensions, exclude_patterns)
|
||||
|
|
@ -582,6 +624,7 @@ async def index_local_folder(
|
|||
await task_logger.log_task_failure(
|
||||
log_entry, f"Failed to scan folder: {e}", "Scan error", {}
|
||||
)
|
||||
await _clear_indexing_flag(session, root_folder_id)
|
||||
return 0, 0, root_folder_id, f"Failed to scan folder: {e}"
|
||||
|
||||
logger.info(f"Found {len(files)} files in folder")
|
||||
|
|
@ -630,6 +673,24 @@ async def index_local_folder(
|
|||
skipped_count += 1
|
||||
continue
|
||||
|
||||
raw_hash = await asyncio.to_thread(
|
||||
_compute_raw_file_hash, file_path_abs
|
||||
)
|
||||
|
||||
stored_raw_hash = (existing_document.document_metadata or {}).get(
|
||||
"raw_file_hash"
|
||||
)
|
||||
if stored_raw_hash and stored_raw_hash == raw_hash:
|
||||
meta = dict(existing_document.document_metadata or {})
|
||||
meta["mtime"] = current_mtime
|
||||
existing_document.document_metadata = meta
|
||||
if not DocumentStatus.is_state(
|
||||
existing_document.status, DocumentStatus.READY
|
||||
):
|
||||
existing_document.status = DocumentStatus.ready()
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
estimated_pages = await _check_page_limit_or_skip(
|
||||
page_limit_service, user_id, file_path_abs
|
||||
|
|
@ -653,6 +714,7 @@ async def index_local_folder(
|
|||
if existing_document.content_hash == content_hash:
|
||||
meta = dict(existing_document.document_metadata or {})
|
||||
meta["mtime"] = current_mtime
|
||||
meta["raw_file_hash"] = raw_hash
|
||||
existing_document.document_metadata = meta
|
||||
if not DocumentStatus.is_state(
|
||||
existing_document.status, DocumentStatus.READY
|
||||
|
|
@ -687,6 +749,10 @@ async def index_local_folder(
|
|||
skipped_count += 1
|
||||
continue
|
||||
|
||||
raw_hash = await asyncio.to_thread(
|
||||
_compute_raw_file_hash, file_path_abs
|
||||
)
|
||||
|
||||
doc = _build_connector_doc(
|
||||
title=file_info["name"],
|
||||
content=content,
|
||||
|
|
@ -702,6 +768,7 @@ async def index_local_folder(
|
|||
"mtime": file_info["modified_at"].timestamp(),
|
||||
"estimated_pages": estimated_pages,
|
||||
"content_length": len(content),
|
||||
"raw_file_hash": raw_hash,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -753,29 +820,16 @@ async def index_local_folder(
|
|||
compute_unique_identifier_hash,
|
||||
)
|
||||
|
||||
pipeline = IndexingPipelineService(session)
|
||||
doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs}
|
||||
documents = await pipeline.prepare_for_indexing(connector_docs)
|
||||
|
||||
# Assign folder_id immediately so docs appear in the correct
|
||||
# folder while still pending/processing (visible via Zero sync).
|
||||
for document in documents:
|
||||
cd = doc_map.get(document.unique_identifier_hash)
|
||||
if cd is None:
|
||||
continue
|
||||
for cd in connector_docs:
|
||||
rel_path = (cd.metadata or {}).get("file_path", "")
|
||||
parent_dir = str(Path(rel_path).parent) if rel_path else ""
|
||||
if parent_dir == ".":
|
||||
parent_dir = ""
|
||||
document.folder_id = folder_mapping.get(
|
||||
parent_dir, folder_mapping.get("")
|
||||
)
|
||||
try:
|
||||
await session.commit()
|
||||
except IntegrityError:
|
||||
await session.rollback()
|
||||
for document in documents:
|
||||
await session.refresh(document)
|
||||
cd.folder_id = folder_mapping.get(parent_dir, folder_mapping.get(""))
|
||||
|
||||
pipeline = IndexingPipelineService(session)
|
||||
doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs}
|
||||
documents = await pipeline.prepare_for_indexing(connector_docs)
|
||||
|
||||
llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
||||
|
||||
|
|
@ -795,6 +849,7 @@ async def index_local_folder(
|
|||
|
||||
doc_meta = dict(result.document_metadata or {})
|
||||
doc_meta["mtime"] = mtime_info.get("mtime")
|
||||
doc_meta["raw_file_hash"] = mtime_info.get("raw_file_hash")
|
||||
result.document_metadata = doc_meta
|
||||
|
||||
est = mtime_info.get("estimated_pages", 1)
|
||||
|
|
@ -823,8 +878,16 @@ async def index_local_folder(
|
|||
|
||||
root_fid = folder_mapping.get("")
|
||||
if root_fid:
|
||||
from app.services.folder_service import get_folder_subtree_ids
|
||||
|
||||
subtree_ids = await get_folder_subtree_ids(session, root_fid)
|
||||
await _cleanup_empty_folders(
|
||||
session, root_fid, search_space_id, existing_dirs, folder_mapping
|
||||
session,
|
||||
root_fid,
|
||||
search_space_id,
|
||||
existing_dirs,
|
||||
folder_mapping,
|
||||
subtree_ids=subtree_ids,
|
||||
)
|
||||
|
||||
try:
|
||||
|
|
@ -851,6 +914,7 @@ async def index_local_folder(
|
|||
},
|
||||
)
|
||||
|
||||
await _clear_indexing_flag(session, root_folder_id)
|
||||
return indexed_count, skipped_count, root_folder_id, warning_message
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
|
|
@ -859,6 +923,8 @@ async def index_local_folder(
|
|||
await task_logger.log_task_failure(
|
||||
log_entry, f"DB error: {e}", "Database error", {}
|
||||
)
|
||||
if root_folder_id:
|
||||
await _clear_indexing_flag(session, root_folder_id)
|
||||
return 0, 0, root_folder_id, f"Database error: {e}"
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -866,6 +932,8 @@ async def index_local_folder(
|
|||
await task_logger.log_task_failure(
|
||||
log_entry, f"Error: {e}", "Unexpected error", {}
|
||||
)
|
||||
if root_folder_id:
|
||||
await _clear_indexing_flag(session, root_folder_id)
|
||||
return 0, 0, root_folder_id, str(e)
|
||||
|
||||
|
||||
|
|
@ -988,6 +1056,22 @@ async def _index_single_file(
|
|||
DocumentType.LOCAL_FOLDER_FILE.value, unique_id, search_space_id
|
||||
)
|
||||
|
||||
raw_hash = await asyncio.to_thread(_compute_raw_file_hash, str(full_path))
|
||||
|
||||
existing = await check_document_by_unique_identifier(session, uid_hash)
|
||||
|
||||
if existing:
|
||||
stored_raw_hash = (existing.document_metadata or {}).get("raw_file_hash")
|
||||
if stored_raw_hash and stored_raw_hash == raw_hash:
|
||||
mtime = full_path.stat().st_mtime
|
||||
meta = dict(existing.document_metadata or {})
|
||||
meta["mtime"] = mtime
|
||||
existing.document_metadata = meta
|
||||
if not DocumentStatus.is_state(existing.status, DocumentStatus.READY):
|
||||
existing.status = DocumentStatus.ready()
|
||||
await session.commit()
|
||||
return 0, 0, None
|
||||
|
||||
page_limit_service = PageLimitService(session)
|
||||
try:
|
||||
estimated_pages = await _check_page_limit_or_skip(
|
||||
|
|
@ -1006,13 +1090,12 @@ async def _index_single_file(
|
|||
if not content.strip():
|
||||
return 0, 1, None
|
||||
|
||||
existing = await check_document_by_unique_identifier(session, uid_hash)
|
||||
|
||||
if existing:
|
||||
if existing.content_hash == content_hash:
|
||||
mtime = full_path.stat().st_mtime
|
||||
meta = dict(existing.document_metadata or {})
|
||||
meta["mtime"] = mtime
|
||||
meta["raw_file_hash"] = raw_hash
|
||||
existing.document_metadata = meta
|
||||
await session.commit()
|
||||
return 0, 1, None
|
||||
|
|
@ -1031,6 +1114,11 @@ async def _index_single_file(
|
|||
enable_summary=enable_summary,
|
||||
)
|
||||
|
||||
if root_folder_id:
|
||||
connector_doc.folder_id = await _resolve_folder_for_file(
|
||||
session, rel_path, root_folder_id, search_space_id, user_id
|
||||
)
|
||||
|
||||
pipeline = IndexingPipelineService(session)
|
||||
llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
||||
documents = await pipeline.prepare_for_indexing([connector_doc])
|
||||
|
|
@ -1040,21 +1128,12 @@ async def _index_single_file(
|
|||
|
||||
db_doc = documents[0]
|
||||
|
||||
if root_folder_id:
|
||||
try:
|
||||
db_doc.folder_id = await _resolve_folder_for_file(
|
||||
session, rel_path, root_folder_id, search_space_id, user_id
|
||||
)
|
||||
await session.commit()
|
||||
except IntegrityError:
|
||||
await session.rollback()
|
||||
await session.refresh(db_doc)
|
||||
|
||||
await pipeline.index(db_doc, connector_doc, llm)
|
||||
|
||||
await session.refresh(db_doc)
|
||||
doc_meta = dict(db_doc.document_metadata or {})
|
||||
doc_meta["mtime"] = mtime
|
||||
doc_meta["raw_file_hash"] = raw_hash
|
||||
db_doc.document_metadata = doc_meta
|
||||
await session.commit()
|
||||
|
||||
|
|
@ -1081,3 +1160,305 @@ async def _index_single_file(
|
|||
logger.exception(f"Error indexing single file {target_file_path}: {e}")
|
||||
await session.rollback()
|
||||
return 0, 0, str(e)
|
||||
|
||||
|
||||
# ========================================================================
|
||||
# Upload-based folder indexing (works for all deployment modes)
|
||||
# ========================================================================
|
||||
|
||||
|
||||
async def _mirror_folder_structure_from_paths(
|
||||
session: AsyncSession,
|
||||
relative_paths: list[str],
|
||||
folder_name: str,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
root_folder_id: int | None = None,
|
||||
) -> tuple[dict[str, int], int]:
|
||||
"""Create DB Folder rows from a list of relative file paths.
|
||||
|
||||
Unlike ``_mirror_folder_structure`` this does not walk the filesystem;
|
||||
it derives the directory tree from the paths provided by the client.
|
||||
|
||||
Returns (mapping, root_folder_id) where mapping is
|
||||
relative_dir_path -> folder_id. The empty-string key maps to root.
|
||||
"""
|
||||
dir_set: set[str] = set()
|
||||
for rp in relative_paths:
|
||||
parent = str(Path(rp).parent)
|
||||
if parent == ".":
|
||||
continue
|
||||
parts = Path(parent).parts
|
||||
for i in range(len(parts)):
|
||||
dir_set.add(str(Path(*parts[: i + 1])))
|
||||
|
||||
subdirs = sorted(dir_set, key=lambda p: p.count(os.sep))
|
||||
|
||||
mapping: dict[str, int] = {}
|
||||
|
||||
if root_folder_id:
|
||||
existing = (
|
||||
await session.execute(select(Folder).where(Folder.id == root_folder_id))
|
||||
).scalar_one_or_none()
|
||||
if existing:
|
||||
mapping[""] = existing.id
|
||||
else:
|
||||
root_folder_id = None
|
||||
|
||||
if not root_folder_id:
|
||||
root_folder = Folder(
|
||||
name=folder_name,
|
||||
search_space_id=search_space_id,
|
||||
created_by_id=user_id,
|
||||
position="a0",
|
||||
)
|
||||
session.add(root_folder)
|
||||
await session.flush()
|
||||
mapping[""] = root_folder.id
|
||||
root_folder_id = root_folder.id
|
||||
|
||||
for rel_dir in subdirs:
|
||||
dir_parts = Path(rel_dir).parts
|
||||
dir_name = dir_parts[-1]
|
||||
parent_rel = str(Path(*dir_parts[:-1])) if len(dir_parts) > 1 else ""
|
||||
|
||||
parent_id = mapping.get(parent_rel, mapping[""])
|
||||
|
||||
existing_folder = (
|
||||
await session.execute(
|
||||
select(Folder).where(
|
||||
Folder.name == dir_name,
|
||||
Folder.parent_id == parent_id,
|
||||
Folder.search_space_id == search_space_id,
|
||||
)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if existing_folder:
|
||||
mapping[rel_dir] = existing_folder.id
|
||||
else:
|
||||
new_folder = Folder(
|
||||
name=dir_name,
|
||||
parent_id=parent_id,
|
||||
search_space_id=search_space_id,
|
||||
created_by_id=user_id,
|
||||
position="a0",
|
||||
)
|
||||
session.add(new_folder)
|
||||
await session.flush()
|
||||
mapping[rel_dir] = new_folder.id
|
||||
|
||||
await session.flush()
|
||||
return mapping, root_folder_id
|
||||
|
||||
|
||||
UPLOAD_BATCH_CONCURRENCY = 5
|
||||
|
||||
|
||||
async def index_uploaded_files(
|
||||
session: AsyncSession,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
folder_name: str,
|
||||
root_folder_id: int,
|
||||
enable_summary: bool,
|
||||
file_mappings: list[dict],
|
||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||
) -> tuple[int, int, str | None]:
|
||||
"""Index files uploaded from the desktop app via temp paths.
|
||||
|
||||
Each entry in *file_mappings* is ``{temp_path, relative_path, filename}``.
|
||||
This function mirrors the folder structure from the provided relative
|
||||
paths, then indexes each file exactly like ``_index_single_file`` but
|
||||
reads from the temp path. Temp files are cleaned up after processing.
|
||||
|
||||
Returns ``(indexed_count, failed_count, error_summary_or_none)``.
|
||||
"""
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="local_folder_indexing",
|
||||
source="uploaded_folder_indexing",
|
||||
message=f"Indexing {len(file_mappings)} uploaded file(s) for {folder_name}",
|
||||
metadata={"file_count": len(file_mappings)},
|
||||
)
|
||||
|
||||
try:
|
||||
all_relative_paths = [m["relative_path"] for m in file_mappings]
|
||||
_folder_mapping, root_folder_id = await _mirror_folder_structure_from_paths(
|
||||
session=session,
|
||||
relative_paths=all_relative_paths,
|
||||
folder_name=folder_name,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
root_folder_id=root_folder_id,
|
||||
)
|
||||
await session.flush()
|
||||
|
||||
await _set_indexing_flag(session, root_folder_id)
|
||||
|
||||
page_limit_service = PageLimitService(session)
|
||||
pipeline = IndexingPipelineService(session)
|
||||
llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
||||
|
||||
indexed_count = 0
|
||||
failed_count = 0
|
||||
errors: list[str] = []
|
||||
|
||||
for i, mapping in enumerate(file_mappings):
|
||||
temp_path = mapping["temp_path"]
|
||||
relative_path = mapping["relative_path"]
|
||||
filename = mapping["filename"]
|
||||
|
||||
try:
|
||||
unique_id = f"{folder_name}:{relative_path}"
|
||||
uid_hash = compute_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE.value,
|
||||
unique_id,
|
||||
search_space_id,
|
||||
)
|
||||
|
||||
raw_hash = await asyncio.to_thread(_compute_raw_file_hash, temp_path)
|
||||
|
||||
existing = await check_document_by_unique_identifier(session, uid_hash)
|
||||
|
||||
if existing:
|
||||
stored_raw_hash = (existing.document_metadata or {}).get(
|
||||
"raw_file_hash"
|
||||
)
|
||||
if stored_raw_hash and stored_raw_hash == raw_hash:
|
||||
meta = dict(existing.document_metadata or {})
|
||||
meta["mtime"] = datetime.now(UTC).timestamp()
|
||||
existing.document_metadata = meta
|
||||
if not DocumentStatus.is_state(
|
||||
existing.status, DocumentStatus.READY
|
||||
):
|
||||
existing.status = DocumentStatus.ready()
|
||||
await session.commit()
|
||||
continue
|
||||
|
||||
try:
|
||||
estimated_pages = await _check_page_limit_or_skip(
|
||||
page_limit_service, user_id, temp_path
|
||||
)
|
||||
except PageLimitExceededError:
|
||||
logger.warning(f"Page limit exceeded, skipping: {relative_path}")
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
content, content_hash = await _compute_file_content_hash(
|
||||
temp_path, filename, search_space_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not read {relative_path}: {e}")
|
||||
failed_count += 1
|
||||
errors.append(f"{filename}: {e}")
|
||||
continue
|
||||
|
||||
if not content.strip():
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
if existing:
|
||||
if existing.content_hash == content_hash:
|
||||
meta = dict(existing.document_metadata or {})
|
||||
meta["mtime"] = datetime.now(UTC).timestamp()
|
||||
meta["raw_file_hash"] = raw_hash
|
||||
existing.document_metadata = meta
|
||||
if not DocumentStatus.is_state(
|
||||
existing.status, DocumentStatus.READY
|
||||
):
|
||||
existing.status = DocumentStatus.ready()
|
||||
await session.commit()
|
||||
continue
|
||||
|
||||
await create_version_snapshot(session, existing)
|
||||
|
||||
connector_doc = _build_connector_doc(
|
||||
title=filename,
|
||||
content=content,
|
||||
relative_path=relative_path,
|
||||
folder_name=folder_name,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
)
|
||||
|
||||
connector_doc.folder_id = await _resolve_folder_for_file(
|
||||
session,
|
||||
relative_path,
|
||||
root_folder_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
)
|
||||
|
||||
documents = await pipeline.prepare_for_indexing([connector_doc])
|
||||
if not documents:
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
db_doc = documents[0]
|
||||
|
||||
await pipeline.index(db_doc, connector_doc, llm)
|
||||
|
||||
await session.refresh(db_doc)
|
||||
doc_meta = dict(db_doc.document_metadata or {})
|
||||
doc_meta["mtime"] = datetime.now(UTC).timestamp()
|
||||
doc_meta["raw_file_hash"] = raw_hash
|
||||
db_doc.document_metadata = doc_meta
|
||||
await session.commit()
|
||||
|
||||
if DocumentStatus.is_state(db_doc.status, DocumentStatus.READY):
|
||||
indexed_count += 1
|
||||
final_pages = _compute_final_pages(
|
||||
page_limit_service, estimated_pages, len(content)
|
||||
)
|
||||
await page_limit_service.update_page_usage(
|
||||
user_id, final_pages, allow_exceed=True
|
||||
)
|
||||
else:
|
||||
failed_count += 1
|
||||
|
||||
if on_heartbeat_callback and (i + 1) % 5 == 0:
|
||||
await on_heartbeat_callback(i + 1)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error indexing uploaded file {relative_path}: {e}")
|
||||
await session.rollback()
|
||||
failed_count += 1
|
||||
errors.append(f"{filename}: {e}")
|
||||
finally:
|
||||
with contextlib.suppress(OSError):
|
||||
os.unlink(temp_path)
|
||||
|
||||
error_summary = None
|
||||
if errors:
|
||||
error_summary = f"{failed_count} file(s) failed: " + "; ".join(errors[:5])
|
||||
if len(errors) > 5:
|
||||
error_summary += f" ... and {len(errors) - 5} more"
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Upload indexing complete: {indexed_count} indexed, {failed_count} failed",
|
||||
{"indexed": indexed_count, "failed": failed_count},
|
||||
)
|
||||
|
||||
return indexed_count, failed_count, error_summary
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.exception(f"Database error during uploaded file indexing: {e}")
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, f"DB error: {e}", "Database error", {}
|
||||
)
|
||||
return 0, 0, f"Database error: {e}"
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error during uploaded file indexing: {e}")
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, f"Error: {e}", "Unexpected error", {}
|
||||
)
|
||||
return 0, 0, str(e)
|
||||
|
||||
finally:
|
||||
await _clear_indexing_flag(session, root_folder_id)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[project]
|
||||
name = "surf-new-backend"
|
||||
version = "0.0.14"
|
||||
version = "0.0.15"
|
||||
description = "SurfSense Backend"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F7), Tier 5 (P1), Tier 6 (B1-B2)."""
|
||||
"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F7), Tier 5 (P1), Tier 6 (B1-B2), Tier 7 (IP1-IP3)."""
|
||||
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
|
|
@ -1178,3 +1178,131 @@ class TestPageLimits:
|
|||
await db_session.refresh(db_user)
|
||||
assert db_user.pages_used > 0
|
||||
assert db_user.pages_used <= db_user.pages_limit + 1
|
||||
|
||||
|
||||
# ====================================================================
|
||||
# Tier 7: Indexing Progress Flag (IP1-IP3)
|
||||
# ====================================================================
|
||||
|
||||
|
||||
class TestIndexingProgressFlag:
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_ip1_full_scan_clears_flag(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
db_user: User,
|
||||
db_search_space: SearchSpace,
|
||||
tmp_path: Path,
|
||||
):
|
||||
"""IP1: Full-scan mode clears indexing_in_progress after completion."""
|
||||
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
|
||||
|
||||
(tmp_path / "note.md").write_text("# Hello\n\nContent.")
|
||||
|
||||
_, _, root_folder_id, _ = await index_local_folder(
|
||||
session=db_session,
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
folder_path=str(tmp_path),
|
||||
folder_name="test-folder",
|
||||
)
|
||||
|
||||
assert root_folder_id is not None
|
||||
root_folder = (
|
||||
await db_session.execute(select(Folder).where(Folder.id == root_folder_id))
|
||||
).scalar_one()
|
||||
meta = root_folder.folder_metadata or {}
|
||||
assert "indexing_in_progress" not in meta
|
||||
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_ip2_single_file_clears_flag(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
db_user: User,
|
||||
db_search_space: SearchSpace,
|
||||
tmp_path: Path,
|
||||
):
|
||||
"""IP2: Single-file (Chokidar) mode clears indexing_in_progress after completion."""
|
||||
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
|
||||
|
||||
(tmp_path / "root.md").write_text("root")
|
||||
_, _, root_folder_id, _ = await index_local_folder(
|
||||
session=db_session,
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
folder_path=str(tmp_path),
|
||||
folder_name="test-folder",
|
||||
)
|
||||
|
||||
(tmp_path / "new.md").write_text("new file content")
|
||||
|
||||
await index_local_folder(
|
||||
session=db_session,
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
folder_path=str(tmp_path),
|
||||
folder_name="test-folder",
|
||||
target_file_paths=[str(tmp_path / "new.md")],
|
||||
root_folder_id=root_folder_id,
|
||||
)
|
||||
|
||||
root_folder = (
|
||||
await db_session.execute(select(Folder).where(Folder.id == root_folder_id))
|
||||
).scalar_one()
|
||||
meta = root_folder.folder_metadata or {}
|
||||
assert "indexing_in_progress" not in meta
|
||||
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_ip3_flag_set_during_indexing(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
db_user: User,
|
||||
db_search_space: SearchSpace,
|
||||
tmp_path: Path,
|
||||
):
|
||||
"""IP3: indexing_in_progress is True on the root folder while indexing is running."""
|
||||
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
|
||||
|
||||
(tmp_path / "note.md").write_text("# Check flag\n\nDuring indexing.")
|
||||
|
||||
from app.indexing_pipeline.indexing_pipeline_service import (
|
||||
IndexingPipelineService,
|
||||
)
|
||||
|
||||
original_index = IndexingPipelineService.index
|
||||
flag_observed = []
|
||||
|
||||
async def patched_index(self_pipe, document, connector_doc, llm):
|
||||
folder = (
|
||||
await db_session.execute(
|
||||
select(Folder).where(
|
||||
Folder.search_space_id == db_search_space.id,
|
||||
Folder.parent_id.is_(None),
|
||||
)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if folder:
|
||||
meta = folder.folder_metadata or {}
|
||||
flag_observed.append(meta.get("indexing_in_progress", False))
|
||||
return await original_index(self_pipe, document, connector_doc, llm)
|
||||
|
||||
IndexingPipelineService.index = patched_index
|
||||
try:
|
||||
_, _, root_folder_id, _ = await index_local_folder(
|
||||
session=db_session,
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
folder_path=str(tmp_path),
|
||||
folder_name="test-folder",
|
||||
)
|
||||
finally:
|
||||
IndexingPipelineService.index = original_index
|
||||
|
||||
assert len(flag_observed) > 0, "index() should have been called at least once"
|
||||
assert all(flag_observed), "indexing_in_progress should be True during indexing"
|
||||
|
||||
root_folder = (
|
||||
await db_session.execute(select(Folder).where(Folder.id == root_folder_id))
|
||||
).scalar_one()
|
||||
meta = root_folder.folder_metadata or {}
|
||||
assert "indexing_in_progress" not in meta
|
||||
|
|
|
|||
8455
surfsense_backend/uv.lock
generated
8455
surfsense_backend/uv.lock
generated
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue