feat: add integration and unit tests for local folder indexing and document versioning

This commit is contained in:
Anish Sarkar 2026-04-02 11:12:16 +05:30
parent 96a58d0d30
commit 775dea7894
4 changed files with 894 additions and 0 deletions

View file

@ -166,3 +166,24 @@ def make_connector_document(db_connector, db_user):
return ConnectorDocument(**defaults)
return _make
@pytest_asyncio.fixture
async def db_local_folder_connector(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace, tmp_path
) -> SearchSourceConnector:
connector = SearchSourceConnector(
name="Test Local Folder",
connector_type=SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR,
config={
"folder_path": str(tmp_path),
"folder_name": "test-folder",
"exclude_patterns": [],
"file_extensions": None,
},
search_space_id=db_search_space.id,
user_id=db_user.id,
)
db_session.add(connector)
await db_session.flush()
return connector

View file

@ -0,0 +1,609 @@
"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F5), Tier 5 (P1)."""
import os
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import (
Document,
DocumentStatus,
DocumentType,
DocumentVersion,
Folder,
SearchSourceConnector,
SearchSpace,
User,
)
import app.tasks.connector_indexers.local_folder_indexer as _lfi_mod
pytestmark = pytest.mark.integration
@pytest.fixture
def patched_self_hosted(monkeypatch):
_cfg = type("_Cfg", (), {"is_self_hosted": staticmethod(lambda: True)})()
monkeypatch.setattr(_lfi_mod, "config", _cfg)
@pytest.fixture
def patched_embed_for_indexer(monkeypatch):
from app.config import config as app_config
dim = app_config.embedding_model_instance.dimension
mock = MagicMock(return_value=[0.1] * dim)
monkeypatch.setattr(_lfi_mod, "embed_text", mock)
return mock
@pytest.fixture
def patched_chunks_for_indexer(monkeypatch):
from app.db import Chunk
from app.config import config as app_config
dim = app_config.embedding_model_instance.dimension
async def mock_create_chunks(text):
return [Chunk(content="chunk", embedding=[0.1] * dim)]
monkeypatch.setattr(_lfi_mod, "create_document_chunks", mock_create_chunks)
@pytest.fixture
def patched_summary_for_indexer(monkeypatch):
monkeypatch.setattr(_lfi_mod, "get_user_long_context_llm", AsyncMock(return_value=None))
# ====================================================================
# Tier 3: Full Indexer Integration (I1-I5)
# ====================================================================
class TestFullIndexer:
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_i1_new_file_indexed(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""I1: Single new .md file is indexed with status READY."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "note.md").write_text("# Hello World\n\nContent here.")
count, skipped, err = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
assert err is None
assert count == 1
docs = (
await db_session.execute(
select(Document).where(
Document.connector_id == db_local_folder_connector.id
)
)
).scalars().all()
assert len(docs) == 1
assert docs[0].document_type == DocumentType.LOCAL_FOLDER_FILE
assert DocumentStatus.is_state(docs[0].status, DocumentStatus.READY)
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_i2_unchanged_skipped(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""I2: Second run on unchanged directory creates no new documents."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "note.md").write_text("# Hello\n\nSame content.")
count1, _, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
assert count1 == 1
# Second run — unchanged
count2, _, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
assert count2 == 0
total = (
await db_session.execute(
select(func.count()).select_from(Document).where(
Document.connector_id == db_local_folder_connector.id
)
)
).scalar_one()
assert total == 1
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_i3_changed_reindexed(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""I3: Modified file content triggers re-index and creates a version."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
f = tmp_path / "note.md"
f.write_text("# Version 1\n\nOriginal.")
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
# Modify
f.write_text("# Version 2\n\nUpdated.")
# Touch mtime to ensure it's detected as different
os.utime(f, (f.stat().st_atime + 10, f.stat().st_mtime + 10))
count, _, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
assert count == 1
# Should have a version snapshot
versions = (
await db_session.execute(
select(DocumentVersion).join(Document).where(
Document.connector_id == db_local_folder_connector.id
)
)
).scalars().all()
assert len(versions) >= 1
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_i4_deleted_removed(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""I4: Deleted file is removed from DB on re-sync."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
f = tmp_path / "to_delete.md"
f.write_text("# Delete me")
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
docs_before = (
await db_session.execute(
select(func.count()).select_from(Document).where(
Document.connector_id == db_local_folder_connector.id
)
)
).scalar_one()
assert docs_before == 1
f.unlink()
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
docs_after = (
await db_session.execute(
select(func.count()).select_from(Document).where(
Document.connector_id == db_local_folder_connector.id
)
)
).scalar_one()
assert docs_after == 0
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_i5_single_file_mode(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""I5: Single-file mode only processes the specified file."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "a.md").write_text("File A")
(tmp_path / "b.md").write_text("File B")
(tmp_path / "c.md").write_text("File C")
count, _, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
target_file_path=str(tmp_path / "b.md"),
)
assert count == 1
docs = (
await db_session.execute(
select(Document).where(
Document.connector_id == db_local_folder_connector.id
)
)
).scalars().all()
assert len(docs) == 1
assert docs[0].title == "b"
# ====================================================================
# Tier 4: Folder Mirroring (F1-F5)
# ====================================================================
class TestFolderMirroring:
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_f1_root_folder_created(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""F1: First sync creates a root Folder and stores root_folder_id."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "root.md").write_text("Root file")
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
# Refresh connector
await db_session.refresh(db_local_folder_connector)
root_id = db_local_folder_connector.config.get("root_folder_id")
assert root_id is not None
root_folder = (
await db_session.execute(select(Folder).where(Folder.id == root_id))
).scalar_one()
assert root_folder.name == "test-folder"
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_f2_nested_folder_rows(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""F2: Nested dirs create Folder rows with correct parent_id chain."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
daily = tmp_path / "notes" / "daily"
daily.mkdir(parents=True)
weekly = tmp_path / "notes" / "weekly"
weekly.mkdir(parents=True)
(daily / "today.md").write_text("today")
(weekly / "review.md").write_text("review")
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
folders = (
await db_session.execute(
select(Folder).where(Folder.search_space_id == db_search_space.id)
)
).scalars().all()
folder_names = {f.name for f in folders}
assert "notes" in folder_names
assert "daily" in folder_names
assert "weekly" in folder_names
notes_folder = next(f for f in folders if f.name == "notes")
daily_folder = next(f for f in folders if f.name == "daily")
weekly_folder = next(f for f in folders if f.name == "weekly")
assert daily_folder.parent_id == notes_folder.id
assert weekly_folder.parent_id == notes_folder.id
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_f3_resync_reuses_folders(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""F3: Re-sync reuses existing Folder rows, no duplicates."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
sub = tmp_path / "docs"
sub.mkdir()
(sub / "file.md").write_text("content")
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
folders_before = (
await db_session.execute(
select(Folder).where(Folder.search_space_id == db_search_space.id)
)
).scalars().all()
ids_before = {f.id for f in folders_before}
# Re-sync
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
folders_after = (
await db_session.execute(
select(Folder).where(Folder.search_space_id == db_search_space.id)
)
).scalars().all()
ids_after = {f.id for f in folders_after}
assert ids_before == ids_after
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_f4_folder_id_assigned(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""F4: Documents get correct folder_id based on their directory."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
daily = tmp_path / "notes" / "daily"
daily.mkdir(parents=True)
(daily / "today.md").write_text("today note")
(tmp_path / "root.md").write_text("root note")
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
docs = (
await db_session.execute(
select(Document).where(
Document.connector_id == db_local_folder_connector.id
)
)
).scalars().all()
today_doc = next(d for d in docs if d.title == "today")
root_doc = next(d for d in docs if d.title == "root")
daily_folder = (
await db_session.execute(
select(Folder).where(Folder.name == "daily")
)
).scalar_one()
assert today_doc.folder_id == daily_folder.id
# Root doc should be in the root folder
await db_session.refresh(db_local_folder_connector)
root_fid = db_local_folder_connector.config.get("root_folder_id")
assert root_doc.folder_id == root_fid
@pytest.mark.usefixtures(
"patched_self_hosted",
"patched_embed_for_indexer",
"patched_chunks_for_indexer",
"patched_summary_for_indexer",
)
async def test_f5_empty_folder_cleanup(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""F5: Deleted dir's empty Folder row is cleaned up on re-sync."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
import shutil
daily = tmp_path / "notes" / "daily"
daily.mkdir(parents=True)
weekly = tmp_path / "notes" / "weekly"
weekly.mkdir(parents=True)
(daily / "today.md").write_text("today")
(weekly / "review.md").write_text("review")
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
# Verify weekly folder exists
weekly_folder = (
await db_session.execute(
select(Folder).where(Folder.name == "weekly")
)
).scalar_one_or_none()
assert weekly_folder is not None
# Delete weekly directory + its file
shutil.rmtree(weekly)
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
)
# weekly Folder should be gone (empty, dir removed)
weekly_after = (
await db_session.execute(
select(Folder).where(Folder.name == "weekly")
)
).scalar_one_or_none()
assert weekly_after is None
# daily should still exist
daily_after = (
await db_session.execute(
select(Folder).where(Folder.name == "daily")
)
).scalar_one_or_none()
assert daily_after is not None
# ====================================================================
# Tier 5: Pipeline Integration (P1)
# ====================================================================
class TestPipelineIntegration:
@pytest.mark.usefixtures(
"patched_summarize", "patched_embed_texts", "patched_chunk_text"
)
async def test_p1_local_folder_file_through_pipeline(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
mocker,
):
"""P1: LOCAL_FOLDER_FILE ConnectorDocument through prepare+index to READY."""
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
doc = ConnectorDocument(
title="Test Local File",
source_markdown="## Local file\n\nContent from disk.",
unique_id="test-folder:test.md",
document_type=DocumentType.LOCAL_FOLDER_FILE,
search_space_id=db_search_space.id,
connector_id=db_local_folder_connector.id,
created_by_id=str(db_user.id),
)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([doc])
assert len(prepared) == 1
db_doc = prepared[0]
result = await service.index(db_doc, doc, llm=mocker.Mock())
assert result is not None
docs = (
await db_session.execute(
select(Document).where(
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == db_search_space.id,
)
)
).scalars().all()
assert len(docs) == 1
assert DocumentStatus.is_state(docs[0].status, DocumentStatus.READY)

View file

@ -0,0 +1,184 @@
"""Integration tests for document versioning snapshot + cleanup."""
from datetime import UTC, datetime, timedelta
import pytest
import pytest_asyncio
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType, DocumentVersion, SearchSpace, User
pytestmark = pytest.mark.integration
@pytest_asyncio.fixture
async def db_document(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
) -> Document:
doc = Document(
title="Test Doc",
document_type=DocumentType.LOCAL_FOLDER_FILE,
document_metadata={},
content="Summary of test doc.",
content_hash="abc123",
unique_identifier_hash="local_folder:test-folder:test.md",
source_markdown="# Test\n\nOriginal content.",
search_space_id=db_search_space.id,
created_by_id=db_user.id,
)
db_session.add(doc)
await db_session.flush()
return doc
async def _version_count(session: AsyncSession, document_id: int) -> int:
result = await session.execute(
select(func.count()).select_from(DocumentVersion).where(
DocumentVersion.document_id == document_id
)
)
return result.scalar_one()
async def _get_versions(session: AsyncSession, document_id: int) -> list[DocumentVersion]:
result = await session.execute(
select(DocumentVersion)
.where(DocumentVersion.document_id == document_id)
.order_by(DocumentVersion.version_number)
)
return list(result.scalars().all())
class TestCreateVersionSnapshot:
"""V1-V5: TDD slices for create_version_snapshot."""
async def test_v1_creates_first_version(self, db_session, db_document):
"""V1: First snapshot creates version 1 with the document's current state."""
from app.utils.document_versioning import create_version_snapshot
await create_version_snapshot(db_session, db_document)
versions = await _get_versions(db_session, db_document.id)
assert len(versions) == 1
assert versions[0].version_number == 1
assert versions[0].source_markdown == "# Test\n\nOriginal content."
assert versions[0].content_hash == "abc123"
assert versions[0].title == "Test Doc"
assert versions[0].document_id == db_document.id
async def test_v2_creates_version_2_after_30_min(
self, db_session, db_document, monkeypatch
):
"""V2: After 30+ minutes, a new version is created (not overwritten)."""
from app.utils.document_versioning import create_version_snapshot
t0 = datetime(2025, 1, 1, 12, 0, 0, tzinfo=UTC)
monkeypatch.setattr(
"app.utils.document_versioning._now", lambda: t0
)
await create_version_snapshot(db_session, db_document)
# Simulate content change and time passing
db_document.source_markdown = "# Test\n\nUpdated content."
db_document.content_hash = "def456"
t1 = t0 + timedelta(minutes=31)
monkeypatch.setattr(
"app.utils.document_versioning._now", lambda: t1
)
await create_version_snapshot(db_session, db_document)
versions = await _get_versions(db_session, db_document.id)
assert len(versions) == 2
assert versions[0].version_number == 1
assert versions[1].version_number == 2
assert versions[1].source_markdown == "# Test\n\nUpdated content."
async def test_v3_overwrites_within_30_min(
self, db_session, db_document, monkeypatch
):
"""V3: Within 30 minutes, the latest version is overwritten."""
from app.utils.document_versioning import create_version_snapshot
t0 = datetime(2025, 1, 1, 12, 0, 0, tzinfo=UTC)
monkeypatch.setattr(
"app.utils.document_versioning._now", lambda: t0
)
await create_version_snapshot(db_session, db_document)
count_after_first = await _version_count(db_session, db_document.id)
assert count_after_first == 1
# Simulate quick edit within 30 minutes
db_document.source_markdown = "# Test\n\nQuick edit."
db_document.content_hash = "quick123"
t1 = t0 + timedelta(minutes=10)
monkeypatch.setattr(
"app.utils.document_versioning._now", lambda: t1
)
await create_version_snapshot(db_session, db_document)
count_after_second = await _version_count(db_session, db_document.id)
assert count_after_second == 1 # still 1, not 2
versions = await _get_versions(db_session, db_document.id)
assert versions[0].source_markdown == "# Test\n\nQuick edit."
assert versions[0].content_hash == "quick123"
async def test_v4_cleanup_90_day_old_versions(
self, db_session, db_document, monkeypatch
):
"""V4: Versions older than 90 days are cleaned up."""
from app.utils.document_versioning import create_version_snapshot
base = datetime(2025, 1, 1, 12, 0, 0, tzinfo=UTC)
# Create 5 versions spread across time: 3 older than 90 days, 2 recent
for i in range(5):
db_document.source_markdown = f"Content v{i+1}"
db_document.content_hash = f"hash_{i+1}"
if i < 3:
t = base + timedelta(days=i) # old
else:
t = base + timedelta(days=100 + i) # recent
monkeypatch.setattr(
"app.utils.document_versioning._now", lambda _t=t: _t
)
await create_version_snapshot(db_session, db_document)
# Now trigger cleanup from a "current" time that makes the first 3 versions > 90 days old
now = base + timedelta(days=200)
monkeypatch.setattr(
"app.utils.document_versioning._now", lambda: now
)
db_document.source_markdown = "Content v6"
db_document.content_hash = "hash_6"
await create_version_snapshot(db_session, db_document)
versions = await _get_versions(db_session, db_document.id)
# The first 3 (old) should be cleaned up; versions 4, 5, 6 remain
for v in versions:
age = now - v.created_at.replace(tzinfo=UTC)
assert age <= timedelta(days=90), f"Version {v.version_number} is too old"
async def test_v5_cap_at_20_versions(
self, db_session, db_document, monkeypatch
):
"""V5: More than 20 versions triggers cap — oldest gets deleted."""
from app.utils.document_versioning import create_version_snapshot
base = datetime(2025, 6, 1, 12, 0, 0, tzinfo=UTC)
# Create 21 versions (all within 90 days, each 31 min apart)
for i in range(21):
db_document.source_markdown = f"Content v{i+1}"
db_document.content_hash = f"hash_{i+1}"
t = base + timedelta(minutes=31 * i)
monkeypatch.setattr(
"app.utils.document_versioning._now", lambda _t=t: _t
)
await create_version_snapshot(db_session, db_document)
versions = await _get_versions(db_session, db_document.id)
assert len(versions) == 20
# The lowest version_number should be 2 (version 1 was the oldest and got capped)
assert versions[0].version_number == 2

View file

@ -0,0 +1,80 @@
"""Unit tests for scan_folder() pure logic — Tier 2 TDD slices (S1-S4)."""
from pathlib import Path
import pytest
pytestmark = pytest.mark.unit
class TestScanFolder:
"""S1-S4: scan_folder() with real tmp_path filesystem."""
def test_s1_single_md_file(self, tmp_path: Path):
"""S1: scan_folder on a dir with one .md file returns correct entry."""
from app.tasks.connector_indexers.local_folder_indexer import scan_folder
md = tmp_path / "note.md"
md.write_text("# Hello")
results = scan_folder(str(tmp_path))
assert len(results) == 1
entry = results[0]
assert entry["relative_path"] == "note.md"
assert entry["size"] > 0
assert "modified_at" in entry
assert entry["path"] == str(md)
def test_s2_extension_filter(self, tmp_path: Path):
"""S2: file_extensions filter returns only matching files."""
from app.tasks.connector_indexers.local_folder_indexer import scan_folder
(tmp_path / "a.md").write_text("md")
(tmp_path / "b.txt").write_text("txt")
(tmp_path / "c.pdf").write_bytes(b"%PDF")
results = scan_folder(str(tmp_path), file_extensions=[".md"])
names = {r["relative_path"] for r in results}
assert names == {"a.md"}
def test_s3_exclude_patterns(self, tmp_path: Path):
"""S3: exclude_patterns skips files inside excluded directories."""
from app.tasks.connector_indexers.local_folder_indexer import scan_folder
(tmp_path / "good.md").write_text("good")
nm = tmp_path / "node_modules"
nm.mkdir()
(nm / "dep.js").write_text("module")
git = tmp_path / ".git"
git.mkdir()
(git / "config").write_text("gitconfig")
results = scan_folder(
str(tmp_path), exclude_patterns=["node_modules", ".git"]
)
names = {r["relative_path"] for r in results}
assert "good.md" in names
assert not any("node_modules" in n for n in names)
assert not any(".git" in n for n in names)
def test_s4_nested_dirs(self, tmp_path: Path):
"""S4: nested subdirectories produce correct relative paths."""
from app.tasks.connector_indexers.local_folder_indexer import scan_folder
daily = tmp_path / "notes" / "daily"
daily.mkdir(parents=True)
weekly = tmp_path / "notes" / "weekly"
weekly.mkdir(parents=True)
(daily / "today.md").write_text("today")
(weekly / "review.md").write_text("review")
(tmp_path / "root.txt").write_text("root")
results = scan_folder(str(tmp_path))
paths = {r["relative_path"] for r in results}
assert "notes/daily/today.md" in paths or "notes\\daily\\today.md" in paths
assert "notes/weekly/review.md" in paths or "notes\\weekly\\review.md" in paths
assert "root.txt" in paths