From a8b83dcf3f52346fc9297d5f834e04969216fe4a Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:48:50 +0530 Subject: [PATCH] feat: add folder_id support in ConnectorDocument and indexing pipeline for improved document organization --- .../indexing_pipeline/connector_document.py | 1 + .../indexing_pipeline_service.py | 5 ++ .../local_folder_indexer.py | 59 ++++++------------- 3 files changed, 25 insertions(+), 40 deletions(-) diff --git a/surfsense_backend/app/indexing_pipeline/connector_document.py b/surfsense_backend/app/indexing_pipeline/connector_document.py index 019efe287..4f5d6e2e0 100644 --- a/surfsense_backend/app/indexing_pipeline/connector_document.py +++ b/surfsense_backend/app/indexing_pipeline/connector_document.py @@ -17,6 +17,7 @@ class ConnectorDocument(BaseModel): metadata: dict = {} connector_id: int | None = None created_by_id: str + folder_id: int | None = None @field_validator("title", "source_markdown", "unique_id", "created_by_id") @classmethod diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 0fa4006f5..22c552e5c 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -268,6 +268,8 @@ class IndexingPipelineService: ): existing.status = DocumentStatus.pending() existing.updated_at = datetime.now(UTC) + if connector_doc.folder_id is not None: + existing.folder_id = connector_doc.folder_id documents.append(existing) log_document_requeued(ctx) continue @@ -294,6 +296,8 @@ class IndexingPipelineService: existing.document_metadata = connector_doc.metadata existing.updated_at = datetime.now(UTC) existing.status = DocumentStatus.pending() + if connector_doc.folder_id is not None: + existing.folder_id = connector_doc.folder_id documents.append(existing) log_document_updated(ctx) continue @@ -317,6 +321,7 @@ class IndexingPipelineService: created_by_id=connector_doc.created_by_id, updated_at=datetime.now(UTC), status=DocumentStatus.pending(), + folder_id=connector_doc.folder_id, ) self.session.add(document) documents.append(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py index 1d890c8d3..3360cd343 100644 --- a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py @@ -790,29 +790,18 @@ async def index_local_folder( compute_unique_identifier_hash, ) - pipeline = IndexingPipelineService(session) - doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs} - documents = await pipeline.prepare_for_indexing(connector_docs) - - # Assign folder_id immediately so docs appear in the correct - # folder while still pending/processing (visible via Zero sync). - for document in documents: - cd = doc_map.get(document.unique_identifier_hash) - if cd is None: - continue + for cd in connector_docs: rel_path = (cd.metadata or {}).get("file_path", "") parent_dir = str(Path(rel_path).parent) if rel_path else "" if parent_dir == ".": parent_dir = "" - document.folder_id = folder_mapping.get( + cd.folder_id = folder_mapping.get( parent_dir, folder_mapping.get("") ) - try: - await session.commit() - except IntegrityError: - await session.rollback() - for document in documents: - await session.refresh(document) + + pipeline = IndexingPipelineService(session) + doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs} + documents = await pipeline.prepare_for_indexing(connector_docs) llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1092,6 +1081,11 @@ async def _index_single_file( enable_summary=enable_summary, ) + if root_folder_id: + connector_doc.folder_id = await _resolve_folder_for_file( + session, rel_path, root_folder_id, search_space_id, user_id + ) + pipeline = IndexingPipelineService(session) llm = await get_user_long_context_llm(session, user_id, search_space_id) documents = await pipeline.prepare_for_indexing([connector_doc]) @@ -1101,16 +1095,6 @@ async def _index_single_file( db_doc = documents[0] - if root_folder_id: - try: - db_doc.folder_id = await _resolve_folder_for_file( - session, rel_path, root_folder_id, search_space_id, user_id - ) - await session.commit() - except IntegrityError: - await session.rollback() - await session.refresh(db_doc) - await pipeline.index(db_doc, connector_doc, llm) await session.refresh(db_doc) @@ -1376,6 +1360,14 @@ async def index_uploaded_files( enable_summary=enable_summary, ) + connector_doc.folder_id = await _resolve_folder_for_file( + session, + relative_path, + root_folder_id, + search_space_id, + user_id, + ) + documents = await pipeline.prepare_for_indexing([connector_doc]) if not documents: failed_count += 1 @@ -1383,19 +1375,6 @@ async def index_uploaded_files( db_doc = documents[0] - try: - db_doc.folder_id = await _resolve_folder_for_file( - session, - relative_path, - root_folder_id, - search_space_id, - user_id, - ) - await session.commit() - except IntegrityError: - await session.rollback() - await session.refresh(db_doc) - await pipeline.index(db_doc, connector_doc, llm) await session.refresh(db_doc)