mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-08 20:25:19 +02:00
refactor: streamline local folder indexing logic by removing unused imports, enhancing content hashing, and improving document creation process
This commit is contained in:
parent
c27d24a117
commit
53df393cf7
2 changed files with 174 additions and 380 deletions
|
|
@ -14,7 +14,6 @@ no connector row is read.
|
|||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
from collections.abc import Awaitable, Callable
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
|
@ -30,24 +29,16 @@ from app.db import (
|
|||
DocumentType,
|
||||
Folder,
|
||||
)
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
embed_text,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
generate_unique_identifier_hash,
|
||||
)
|
||||
from app.utils.document_versioning import create_version_snapshot
|
||||
|
||||
from .base import (
|
||||
build_document_metadata_string,
|
||||
check_document_by_unique_identifier,
|
||||
check_duplicate_document_by_hash,
|
||||
get_current_timestamp,
|
||||
logger,
|
||||
safe_set_chunks,
|
||||
)
|
||||
|
||||
PLAINTEXT_EXTENSIONS = frozenset({
|
||||
|
|
@ -89,7 +80,6 @@ def _needs_etl(filename: str) -> bool:
|
|||
return not _is_plaintext_file(filename) and not _is_audio_file(filename)
|
||||
|
||||
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
|
||||
HEARTBEAT_INTERVAL_SECONDS = 30
|
||||
|
||||
DEFAULT_EXCLUDE_PATTERNS = [
|
||||
".git",
|
||||
|
|
@ -210,6 +200,16 @@ async def _read_file_content(file_path: str, filename: str) -> str:
|
|||
return await _parse_file_to_markdown(file_path, filename)
|
||||
|
||||
|
||||
def _content_hash(content: str, search_space_id: int) -> str:
|
||||
"""SHA-256 hash of content scoped to a search space.
|
||||
|
||||
Matches the format used by ``compute_content_hash`` in the unified
|
||||
pipeline so that dedup checks are consistent.
|
||||
"""
|
||||
import hashlib
|
||||
return hashlib.sha256(f"{search_space_id}:{content}".encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
async def _compute_file_content_hash(
|
||||
file_path: str, filename: str, search_space_id: int,
|
||||
) -> tuple[str, str]:
|
||||
|
|
@ -218,8 +218,7 @@ async def _compute_file_content_hash(
|
|||
Returns (content_text, content_hash).
|
||||
"""
|
||||
content = await _read_file_content(file_path, filename)
|
||||
content_hash = generate_content_hash(content, search_space_id)
|
||||
return content, content_hash
|
||||
return content, _content_hash(content, search_space_id)
|
||||
|
||||
|
||||
async def _mirror_folder_structure(
|
||||
|
|
@ -454,6 +453,40 @@ async def _cleanup_empty_folders(
|
|||
candidates = remaining
|
||||
|
||||
|
||||
def _build_connector_doc(
|
||||
title: str,
|
||||
content: str,
|
||||
relative_path: str,
|
||||
folder_name: str,
|
||||
*,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
enable_summary: bool,
|
||||
) -> ConnectorDocument:
|
||||
"""Build a ConnectorDocument from a local file's extracted content."""
|
||||
unique_id = f"{folder_name}:{relative_path}"
|
||||
metadata = {
|
||||
"folder_name": folder_name,
|
||||
"file_path": relative_path,
|
||||
"document_type": "Local Folder File",
|
||||
"connector_type": "Local Folder",
|
||||
}
|
||||
fallback_summary = f"File: {title}\n\n{content[:4000]}"
|
||||
|
||||
return ConnectorDocument(
|
||||
title=title,
|
||||
source_markdown=content,
|
||||
unique_id=unique_id,
|
||||
document_type=DocumentType.LOCAL_FOLDER_FILE,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=None,
|
||||
created_by_id=user_id,
|
||||
should_summarize=enable_summary,
|
||||
fallback_summary=fallback_summary,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
async def index_local_folder(
|
||||
session: AsyncSession,
|
||||
search_space_id: int,
|
||||
|
|
@ -551,15 +584,13 @@ async def index_local_folder(
|
|||
indexed_count = 0
|
||||
skipped_count = 0
|
||||
failed_count = 0
|
||||
duplicate_count = 0
|
||||
|
||||
last_heartbeat_time = time.time()
|
||||
|
||||
# ================================================================
|
||||
# PHASE 1: Analyze all files, create pending documents
|
||||
# PHASE 1: Pre-filter files (mtime / content-hash), version changed
|
||||
# ================================================================
|
||||
files_to_process: list[dict] = []
|
||||
new_documents_created = False
|
||||
connector_docs: list[ConnectorDocument] = []
|
||||
# Maps unique_id -> (relative_path, mtime) for post-pipeline folder_id assignment
|
||||
file_meta_map: dict[str, dict] = {}
|
||||
seen_unique_hashes: set[str] = set()
|
||||
|
||||
for file_info in files:
|
||||
|
|
@ -568,8 +599,8 @@ async def index_local_folder(
|
|||
file_path_abs = file_info["path"]
|
||||
|
||||
unique_identifier = f"{folder_name}:{relative_path}"
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE,
|
||||
unique_identifier_hash = compute_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE.value,
|
||||
unique_identifier,
|
||||
search_space_id,
|
||||
)
|
||||
|
|
@ -612,94 +643,42 @@ async def index_local_folder(
|
|||
continue
|
||||
|
||||
await create_version_snapshot(session, existing_document)
|
||||
else:
|
||||
try:
|
||||
content, content_hash = await _compute_file_content_hash(
|
||||
file_path_abs, file_info["relative_path"], search_space_id
|
||||
)
|
||||
except Exception as read_err:
|
||||
logger.warning(f"Could not read {file_path_abs}: {read_err}")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
files_to_process.append(
|
||||
{
|
||||
"document": existing_document,
|
||||
"is_new": False,
|
||||
"file_info": file_info,
|
||||
"content": content,
|
||||
"content_hash": content_hash,
|
||||
"unique_identifier_hash": unique_identifier_hash,
|
||||
"relative_path": relative_path,
|
||||
"title": file_info["name"],
|
||||
}
|
||||
)
|
||||
continue
|
||||
if not content.strip():
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
content, content_hash = await _compute_file_content_hash(
|
||||
file_path_abs, file_info["relative_path"], search_space_id
|
||||
)
|
||||
except Exception as read_err:
|
||||
logger.warning(f"Could not read {file_path_abs}: {read_err}")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
if not content.strip():
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
with session.no_autoflush:
|
||||
dup = await check_duplicate_document_by_hash(session, content_hash)
|
||||
if dup:
|
||||
duplicate_count += 1
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
parent_dir = str(Path(relative_path).parent)
|
||||
if parent_dir == ".":
|
||||
parent_dir = ""
|
||||
folder_id = folder_mapping.get(parent_dir, folder_mapping.get(""))
|
||||
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
doc = _build_connector_doc(
|
||||
title=file_info["name"],
|
||||
document_type=DocumentType.LOCAL_FOLDER_FILE,
|
||||
document_metadata={
|
||||
"folder_name": folder_name,
|
||||
"file_path": relative_path,
|
||||
"mtime": file_info["modified_at"].timestamp(),
|
||||
},
|
||||
content="Pending...",
|
||||
content_hash=unique_identifier_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=None,
|
||||
status=DocumentStatus.pending(),
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
connector_id=None,
|
||||
folder_id=folder_id,
|
||||
)
|
||||
session.add(document)
|
||||
new_documents_created = True
|
||||
|
||||
files_to_process.append(
|
||||
{
|
||||
"document": document,
|
||||
"is_new": True,
|
||||
"file_info": file_info,
|
||||
"content": content,
|
||||
"content_hash": content_hash,
|
||||
"unique_identifier_hash": unique_identifier_hash,
|
||||
"relative_path": relative_path,
|
||||
"title": file_info["name"],
|
||||
}
|
||||
content=content,
|
||||
relative_path=relative_path,
|
||||
folder_name=folder_name,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
)
|
||||
connector_docs.append(doc)
|
||||
file_meta_map[unique_identifier] = {
|
||||
"relative_path": relative_path,
|
||||
"mtime": file_info["modified_at"].timestamp(),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Phase 1 error for {file_info.get('path')}: {e}")
|
||||
failed_count += 1
|
||||
|
||||
if new_documents_created:
|
||||
await session.commit()
|
||||
|
||||
# ================================================================
|
||||
# PHASE 1.5: Delete documents no longer on disk
|
||||
# ================================================================
|
||||
# Collect ALL folder IDs under this root (including folders that no
|
||||
# longer exist on disk but still have rows in the DB) so we catch
|
||||
# documents in deleted directories too.
|
||||
all_root_folder_ids = set(folder_mapping.values())
|
||||
all_db_folders = (
|
||||
await session.execute(
|
||||
|
|
@ -727,98 +706,51 @@ async def index_local_folder(
|
|||
await session.flush()
|
||||
|
||||
# ================================================================
|
||||
# PHASE 2: Process each document
|
||||
# PHASE 2: Index via unified pipeline
|
||||
# ================================================================
|
||||
long_context_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
if connector_docs:
|
||||
from app.indexing_pipeline.document_hashing import (
|
||||
compute_unique_identifier_hash,
|
||||
)
|
||||
|
||||
for item in files_to_process:
|
||||
if on_heartbeat_callback:
|
||||
current_time = time.time()
|
||||
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
|
||||
pipeline = IndexingPipelineService(session)
|
||||
doc_map = {
|
||||
compute_unique_identifier_hash(cd): cd for cd in connector_docs
|
||||
}
|
||||
documents = await pipeline.prepare_for_indexing(connector_docs)
|
||||
|
||||
llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
||||
|
||||
for document in documents:
|
||||
connector_doc = doc_map.get(document.unique_identifier_hash)
|
||||
if connector_doc is None:
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
result = await pipeline.index(document, connector_doc, llm)
|
||||
|
||||
if DocumentStatus.is_state(result.status, DocumentStatus.READY):
|
||||
indexed_count += 1
|
||||
|
||||
# Assign folder_id and mtime post-pipeline
|
||||
rel_path = (connector_doc.metadata or {}).get("file_path", "")
|
||||
parent_dir = str(Path(rel_path).parent) if rel_path else ""
|
||||
if parent_dir == ".":
|
||||
parent_dir = ""
|
||||
fid = folder_mapping.get(parent_dir, folder_mapping.get(""))
|
||||
|
||||
unique_id = connector_doc.unique_id
|
||||
mtime_info = file_meta_map.get(unique_id, {})
|
||||
|
||||
result.folder_id = fid
|
||||
doc_meta = dict(result.document_metadata or {})
|
||||
doc_meta["mtime"] = mtime_info.get("mtime")
|
||||
result.document_metadata = doc_meta
|
||||
else:
|
||||
failed_count += 1
|
||||
|
||||
if on_heartbeat_callback and indexed_count % 5 == 0:
|
||||
await on_heartbeat_callback(indexed_count)
|
||||
last_heartbeat_time = current_time
|
||||
|
||||
document = item["document"]
|
||||
try:
|
||||
document.status = DocumentStatus.processing()
|
||||
await session.commit()
|
||||
|
||||
title = item["title"]
|
||||
relative_path = item["relative_path"]
|
||||
content = item["content"]
|
||||
content_hash = item["content_hash"]
|
||||
file_info = item["file_info"]
|
||||
|
||||
metadata_sections = [
|
||||
(
|
||||
"METADATA",
|
||||
[
|
||||
f"Title: {title}",
|
||||
f"Folder: {folder_name}",
|
||||
f"Path: {relative_path}",
|
||||
],
|
||||
),
|
||||
("CONTENT", [content]),
|
||||
]
|
||||
document_string = build_document_metadata_string(metadata_sections)
|
||||
|
||||
summary_content = ""
|
||||
if long_context_llm and enable_summary:
|
||||
doc_meta = {
|
||||
"folder_name": folder_name,
|
||||
"file_path": relative_path,
|
||||
}
|
||||
summary_content, _ = await generate_document_summary(
|
||||
document_string, long_context_llm, doc_meta
|
||||
)
|
||||
|
||||
embedding = embed_text(document_string)
|
||||
chunks = await create_document_chunks(document_string)
|
||||
|
||||
parent_dir = str(Path(relative_path).parent)
|
||||
if parent_dir == ".":
|
||||
parent_dir = ""
|
||||
folder_id = folder_mapping.get(parent_dir, folder_mapping.get(""))
|
||||
|
||||
document.title = title
|
||||
document.content = document_string
|
||||
document.content_hash = content_hash
|
||||
document.source_markdown = content
|
||||
document.embedding = embedding
|
||||
document.document_metadata = {
|
||||
"folder_name": folder_name,
|
||||
"file_path": relative_path,
|
||||
"summary": summary_content,
|
||||
"mtime": file_info["modified_at"].timestamp(),
|
||||
}
|
||||
document.folder_id = folder_id
|
||||
await safe_set_chunks(session, document, chunks)
|
||||
document.updated_at = get_current_timestamp()
|
||||
document.status = DocumentStatus.ready()
|
||||
|
||||
indexed_count += 1
|
||||
|
||||
if indexed_count % 10 == 0:
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Phase 2 error for {item.get('relative_path')}: {e}")
|
||||
try:
|
||||
await session.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
document.status = DocumentStatus.failed(str(e)[:500])
|
||||
document.updated_at = get_current_timestamp()
|
||||
await session.commit()
|
||||
except Exception:
|
||||
try:
|
||||
await session.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
failed_count += 1
|
||||
|
||||
# Cleanup empty folders
|
||||
existing_dirs = set()
|
||||
|
|
@ -846,8 +778,6 @@ async def index_local_folder(
|
|||
raise
|
||||
|
||||
warning_parts = []
|
||||
if duplicate_count > 0:
|
||||
warning_parts.append(f"{duplicate_count} duplicate")
|
||||
if failed_count > 0:
|
||||
warning_parts.append(f"{failed_count} failed")
|
||||
warning_message = ", ".join(warning_parts) if warning_parts else None
|
||||
|
|
@ -859,7 +789,6 @@ async def index_local_folder(
|
|||
"indexed": indexed_count,
|
||||
"skipped": skipped_count,
|
||||
"failed": failed_count,
|
||||
"duplicates": duplicate_count,
|
||||
},
|
||||
)
|
||||
|
||||
|
|
@ -899,8 +828,8 @@ async def _index_single_file(
|
|||
if not full_path.exists():
|
||||
rel = str(full_path.relative_to(folder_path))
|
||||
unique_id = f"{folder_name}:{rel}"
|
||||
uid_hash = generate_unique_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE, unique_id, search_space_id
|
||||
uid_hash = compute_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE.value, unique_id, search_space_id
|
||||
)
|
||||
existing = await check_document_by_unique_identifier(session, uid_hash)
|
||||
if existing:
|
||||
|
|
@ -918,8 +847,8 @@ async def _index_single_file(
|
|||
rel_path = str(full_path.relative_to(folder_path))
|
||||
|
||||
unique_id = f"{folder_name}:{rel_path}"
|
||||
uid_hash = generate_unique_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE, unique_id, search_space_id
|
||||
uid_hash = compute_identifier_hash(
|
||||
DocumentType.LOCAL_FOLDER_FILE.value, unique_id, search_space_id
|
||||
)
|
||||
|
||||
try:
|
||||
|
|
@ -945,83 +874,51 @@ async def _index_single_file(
|
|||
|
||||
await create_version_snapshot(session, existing)
|
||||
|
||||
long_context_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
title = full_path.name
|
||||
mtime = full_path.stat().st_mtime
|
||||
|
||||
metadata_sections = [
|
||||
("METADATA", [f"Title: {title}", f"Folder: {folder_name}", f"Path: {rel_path}"]),
|
||||
("CONTENT", [content]),
|
||||
]
|
||||
document_string = build_document_metadata_string(metadata_sections)
|
||||
connector_doc = _build_connector_doc(
|
||||
title=full_path.name,
|
||||
content=content,
|
||||
relative_path=rel_path,
|
||||
folder_name=folder_name,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
)
|
||||
|
||||
summary_content = ""
|
||||
if long_context_llm and enable_summary:
|
||||
summary_content, _ = await generate_document_summary(
|
||||
document_string, long_context_llm, {"folder_name": folder_name, "file_path": rel_path}
|
||||
)
|
||||
pipeline = IndexingPipelineService(session)
|
||||
llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
||||
documents = await pipeline.prepare_for_indexing([connector_doc])
|
||||
|
||||
embedding = embed_text(document_string)
|
||||
chunks = await create_document_chunks(document_string)
|
||||
if not documents:
|
||||
return 0, 1, None
|
||||
|
||||
doc_metadata = {
|
||||
"folder_name": folder_name,
|
||||
"file_path": rel_path,
|
||||
"summary": summary_content,
|
||||
"mtime": mtime,
|
||||
}
|
||||
db_doc = documents[0]
|
||||
await pipeline.index(db_doc, connector_doc, llm)
|
||||
|
||||
# Post-pipeline: assign folder_id and mtime
|
||||
await session.refresh(db_doc)
|
||||
folder_id = None
|
||||
if root_folder_id:
|
||||
folder_id = await _resolve_folder_for_file(
|
||||
session, rel_path, root_folder_id, search_space_id, user_id
|
||||
)
|
||||
|
||||
if existing:
|
||||
existing.title = title
|
||||
existing.content = document_string
|
||||
existing.content_hash = content_hash
|
||||
existing.source_markdown = content
|
||||
existing.embedding = embedding
|
||||
existing.document_metadata = doc_metadata
|
||||
existing.folder_id = folder_id
|
||||
await safe_set_chunks(session, existing, chunks)
|
||||
existing.updated_at = get_current_timestamp()
|
||||
existing.status = DocumentStatus.ready()
|
||||
else:
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=title,
|
||||
document_type=DocumentType.LOCAL_FOLDER_FILE,
|
||||
document_metadata=doc_metadata,
|
||||
content=document_string,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=uid_hash,
|
||||
source_markdown=content,
|
||||
embedding=embedding,
|
||||
status=DocumentStatus.ready(),
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
connector_id=None,
|
||||
folder_id=folder_id,
|
||||
)
|
||||
session.add(document)
|
||||
await session.flush()
|
||||
for chunk in chunks:
|
||||
chunk.document_id = document.id
|
||||
session.add_all(chunks)
|
||||
|
||||
db_doc.folder_id = folder_id
|
||||
doc_meta = dict(db_doc.document_metadata or {})
|
||||
doc_meta["mtime"] = mtime
|
||||
db_doc.document_metadata = doc_meta
|
||||
await session.commit()
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Single file indexed: {rel_path}",
|
||||
{"file": rel_path},
|
||||
)
|
||||
return 1, 0, None
|
||||
indexed = 1 if DocumentStatus.is_state(db_doc.status, DocumentStatus.READY) else 0
|
||||
failed_msg = None if indexed else "Indexing failed"
|
||||
|
||||
if indexed:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Single file indexed: {rel_path}",
|
||||
{"file": rel_path},
|
||||
)
|
||||
return indexed, 0 if indexed else 1, failed_msg
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error indexing single file {target_file_path}: {e}")
|
||||
|
|
|
|||
|
|
@ -1,8 +1,7 @@
|
|||
"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F5), Tier 5 (P1)."""
|
||||
"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F7), Tier 5 (P1)."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import func, select
|
||||
|
|
@ -18,41 +17,11 @@ from app.db import (
|
|||
User,
|
||||
)
|
||||
|
||||
import app.tasks.connector_indexers.local_folder_indexer as _lfi_mod
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_self_hosted(monkeypatch):
|
||||
_cfg = type("_Cfg", (), {"is_self_hosted": staticmethod(lambda: True)})()
|
||||
monkeypatch.setattr(_lfi_mod, "config", _cfg)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_embed_for_indexer(monkeypatch):
|
||||
from app.config import config as app_config
|
||||
dim = app_config.embedding_model_instance.dimension
|
||||
mock = MagicMock(return_value=[0.1] * dim)
|
||||
monkeypatch.setattr(_lfi_mod, "embed_text", mock)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_chunks_for_indexer(monkeypatch):
|
||||
from app.db import Chunk
|
||||
from app.config import config as app_config
|
||||
dim = app_config.embedding_model_instance.dimension
|
||||
|
||||
async def mock_create_chunks(text):
|
||||
return [Chunk(content="chunk", embedding=[0.1] * dim)]
|
||||
|
||||
monkeypatch.setattr(_lfi_mod, "create_document_chunks", mock_create_chunks)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_summary_for_indexer(monkeypatch):
|
||||
monkeypatch.setattr(_lfi_mod, "get_user_long_context_llm", AsyncMock(return_value=None))
|
||||
UNIFIED_FIXTURES = (
|
||||
"patched_summarize", "patched_embed_texts", "patched_chunk_text",
|
||||
)
|
||||
|
||||
|
||||
# ====================================================================
|
||||
|
|
@ -62,12 +31,7 @@ def patched_summary_for_indexer(monkeypatch):
|
|||
|
||||
class TestFullIndexer:
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_i1_new_file_indexed(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -103,12 +67,7 @@ class TestFullIndexer:
|
|||
assert docs[0].document_type == DocumentType.LOCAL_FOLDER_FILE
|
||||
assert DocumentStatus.is_state(docs[0].status, DocumentStatus.READY)
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_i2_unchanged_skipped(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -130,7 +89,6 @@ class TestFullIndexer:
|
|||
)
|
||||
assert count1 == 1
|
||||
|
||||
# Second run — unchanged, pass root_folder_id from first run
|
||||
count2, _, _, _ = await index_local_folder(
|
||||
session=db_session,
|
||||
search_space_id=db_search_space.id,
|
||||
|
|
@ -151,12 +109,7 @@ class TestFullIndexer:
|
|||
).scalar_one()
|
||||
assert total == 1
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_i3_changed_reindexed(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -178,9 +131,7 @@ class TestFullIndexer:
|
|||
folder_name="test-folder",
|
||||
)
|
||||
|
||||
# Modify
|
||||
f.write_text("# Version 2\n\nUpdated.")
|
||||
# Touch mtime to ensure it's detected as different
|
||||
os.utime(f, (f.stat().st_atime + 10, f.stat().st_mtime + 10))
|
||||
|
||||
count, _, _, _ = await index_local_folder(
|
||||
|
|
@ -193,7 +144,6 @@ class TestFullIndexer:
|
|||
)
|
||||
assert count == 1
|
||||
|
||||
# Should have a version snapshot
|
||||
versions = (
|
||||
await db_session.execute(
|
||||
select(DocumentVersion).join(Document).where(
|
||||
|
|
@ -204,12 +154,7 @@ class TestFullIndexer:
|
|||
).scalars().all()
|
||||
assert len(versions) >= 1
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_i4_deleted_removed(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -262,12 +207,7 @@ class TestFullIndexer:
|
|||
).scalar_one()
|
||||
assert docs_after == 0
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_i5_single_file_mode(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -305,18 +245,13 @@ class TestFullIndexer:
|
|||
|
||||
|
||||
# ====================================================================
|
||||
# Tier 4: Folder Mirroring (F1-F5)
|
||||
# Tier 4: Folder Mirroring (F1-F7)
|
||||
# ====================================================================
|
||||
|
||||
|
||||
class TestFolderMirroring:
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_f1_root_folder_created(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -344,12 +279,7 @@ class TestFolderMirroring:
|
|||
).scalar_one()
|
||||
assert root_folder.name == "test-folder"
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_f2_nested_folder_rows(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -393,12 +323,7 @@ class TestFolderMirroring:
|
|||
assert daily_folder.parent_id == notes_folder.id
|
||||
assert weekly_folder.parent_id == notes_folder.id
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_f3_resync_reuses_folders(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -428,7 +353,6 @@ class TestFolderMirroring:
|
|||
).scalars().all()
|
||||
ids_before = {f.id for f in folders_before}
|
||||
|
||||
# Re-sync with root_folder_id from first run
|
||||
await index_local_folder(
|
||||
session=db_session,
|
||||
search_space_id=db_search_space.id,
|
||||
|
|
@ -447,12 +371,7 @@ class TestFolderMirroring:
|
|||
|
||||
assert ids_before == ids_after
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_f4_folder_id_assigned(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -496,15 +415,9 @@ class TestFolderMirroring:
|
|||
|
||||
assert today_doc.folder_id == daily_folder.id
|
||||
|
||||
# Root doc should be in the root folder
|
||||
assert root_doc.folder_id == root_folder_id
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_f5_empty_folder_cleanup(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -531,7 +444,6 @@ class TestFolderMirroring:
|
|||
folder_name="test-folder",
|
||||
)
|
||||
|
||||
# Verify weekly folder exists
|
||||
weekly_folder = (
|
||||
await db_session.execute(
|
||||
select(Folder).where(Folder.name == "weekly")
|
||||
|
|
@ -539,7 +451,6 @@ class TestFolderMirroring:
|
|||
).scalar_one_or_none()
|
||||
assert weekly_folder is not None
|
||||
|
||||
# Delete weekly directory + its file
|
||||
shutil.rmtree(weekly)
|
||||
|
||||
await index_local_folder(
|
||||
|
|
@ -551,7 +462,6 @@ class TestFolderMirroring:
|
|||
root_folder_id=root_folder_id,
|
||||
)
|
||||
|
||||
# weekly Folder should be gone (empty, dir removed)
|
||||
weekly_after = (
|
||||
await db_session.execute(
|
||||
select(Folder).where(Folder.name == "weekly")
|
||||
|
|
@ -559,7 +469,6 @@ class TestFolderMirroring:
|
|||
).scalar_one_or_none()
|
||||
assert weekly_after is None
|
||||
|
||||
# daily should still exist
|
||||
daily_after = (
|
||||
await db_session.execute(
|
||||
select(Folder).where(Folder.name == "daily")
|
||||
|
|
@ -567,12 +476,7 @@ class TestFolderMirroring:
|
|||
).scalar_one_or_none()
|
||||
assert daily_after is not None
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_f6_single_file_creates_subfolder(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -634,12 +538,7 @@ class TestFolderMirroring:
|
|||
assert daily_folder.parent_id == notes_folder.id
|
||||
assert notes_folder.parent_id == root_folder_id
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_self_hosted",
|
||||
"patched_embed_for_indexer",
|
||||
"patched_chunks_for_indexer",
|
||||
"patched_summary_for_indexer",
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_f7_single_file_delete_cleans_empty_folders(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
@ -705,9 +604,7 @@ class TestFolderMirroring:
|
|||
|
||||
class TestPipelineIntegration:
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_summarize", "patched_embed_texts", "patched_chunk_text"
|
||||
)
|
||||
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
|
||||
async def test_p1_local_folder_file_through_pipeline(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue