feat: add folder_id support in ConnectorDocument and indexing pipeline for improved document organization

This commit is contained in:
Anish Sarkar 2026-04-08 17:48:50 +05:30
parent f3aa514240
commit a8b83dcf3f
3 changed files with 25 additions and 40 deletions

View file

@ -17,6 +17,7 @@ class ConnectorDocument(BaseModel):
metadata: dict = {}
connector_id: int | None = None
created_by_id: str
folder_id: int | None = None
@field_validator("title", "source_markdown", "unique_id", "created_by_id")
@classmethod

View file

@ -268,6 +268,8 @@ class IndexingPipelineService:
):
existing.status = DocumentStatus.pending()
existing.updated_at = datetime.now(UTC)
if connector_doc.folder_id is not None:
existing.folder_id = connector_doc.folder_id
documents.append(existing)
log_document_requeued(ctx)
continue
@ -294,6 +296,8 @@ class IndexingPipelineService:
existing.document_metadata = connector_doc.metadata
existing.updated_at = datetime.now(UTC)
existing.status = DocumentStatus.pending()
if connector_doc.folder_id is not None:
existing.folder_id = connector_doc.folder_id
documents.append(existing)
log_document_updated(ctx)
continue
@ -317,6 +321,7 @@ class IndexingPipelineService:
created_by_id=connector_doc.created_by_id,
updated_at=datetime.now(UTC),
status=DocumentStatus.pending(),
folder_id=connector_doc.folder_id,
)
self.session.add(document)
documents.append(document)

View file

@ -790,29 +790,18 @@ async def index_local_folder(
compute_unique_identifier_hash,
)
pipeline = IndexingPipelineService(session)
doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs}
documents = await pipeline.prepare_for_indexing(connector_docs)
# Assign folder_id immediately so docs appear in the correct
# folder while still pending/processing (visible via Zero sync).
for document in documents:
cd = doc_map.get(document.unique_identifier_hash)
if cd is None:
continue
for cd in connector_docs:
rel_path = (cd.metadata or {}).get("file_path", "")
parent_dir = str(Path(rel_path).parent) if rel_path else ""
if parent_dir == ".":
parent_dir = ""
document.folder_id = folder_mapping.get(
cd.folder_id = folder_mapping.get(
parent_dir, folder_mapping.get("")
)
try:
await session.commit()
except IntegrityError:
await session.rollback()
for document in documents:
await session.refresh(document)
pipeline = IndexingPipelineService(session)
doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs}
documents = await pipeline.prepare_for_indexing(connector_docs)
llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -1092,6 +1081,11 @@ async def _index_single_file(
enable_summary=enable_summary,
)
if root_folder_id:
connector_doc.folder_id = await _resolve_folder_for_file(
session, rel_path, root_folder_id, search_space_id, user_id
)
pipeline = IndexingPipelineService(session)
llm = await get_user_long_context_llm(session, user_id, search_space_id)
documents = await pipeline.prepare_for_indexing([connector_doc])
@ -1101,16 +1095,6 @@ async def _index_single_file(
db_doc = documents[0]
if root_folder_id:
try:
db_doc.folder_id = await _resolve_folder_for_file(
session, rel_path, root_folder_id, search_space_id, user_id
)
await session.commit()
except IntegrityError:
await session.rollback()
await session.refresh(db_doc)
await pipeline.index(db_doc, connector_doc, llm)
await session.refresh(db_doc)
@ -1376,6 +1360,14 @@ async def index_uploaded_files(
enable_summary=enable_summary,
)
connector_doc.folder_id = await _resolve_folder_for_file(
session,
relative_path,
root_folder_id,
search_space_id,
user_id,
)
documents = await pipeline.prepare_for_indexing([connector_doc])
if not documents:
failed_count += 1
@ -1383,19 +1375,6 @@ async def index_uploaded_files(
db_doc = documents[0]
try:
db_doc.folder_id = await _resolve_folder_for_file(
session,
relative_path,
root_folder_id,
search_space_id,
user_id,
)
await session.commit()
except IntegrityError:
await session.rollback()
await session.refresh(db_doc)
await pipeline.index(db_doc, connector_doc, llm)
await session.refresh(db_doc)