diff --git a/surfsense_backend/INDEXING_PIPELINE_PLAN.md b/surfsense_backend/INDEXING_PIPELINE_PLAN.md deleted file mode 100644 index fb9307e1b..000000000 --- a/surfsense_backend/INDEXING_PIPELINE_PLAN.md +++ /dev/null @@ -1,458 +0,0 @@ -# Indexing Pipeline Refactor — Plan - -## Context - -The current system has 20+ connector indexers each implementing the same pipeline -independently. This causes: - -- `safe_set_chunks`, `get_current_timestamp`, `check_document_by_unique_identifier` - defined twice across `connector_indexers/base.py` and `document_processors/base.py` -- `config.embedding_model_instance.embed()` called in 20+ places directly -- `document.content` stores different things per connector (summary vs raw text) -- `source_markdown` column always NULL — never written by any connector -- GitHub uses custom character chunking, bypassing the shared chunker -- Obsidian calls LLM then discards the result, embeds raw text instead -- `LinearKBSyncService` and `NotionKBSyncService` both say they - "mirror Phase 2 logic exactly" — a third copy of the same pipeline -- No tests anywhere - ---- - -## What We Are Building - -### 1. `ConnectorDocument` - -The canonical data model. Every adapter produces one. The pipeline consumes one. -Nothing else crosses this boundary. - -**File:** `app/pipeline/connector_document.py` - -``` -title: str -source_markdown: str # raw content, always set, validated not empty -unique_id: str # source-native key: page_id, issue_id, ts... -document_type: DocumentType -should_summarize: bool = True # drives SUMMARIZE step -content_type: "text" | "code" # drives CHUNK step (chunker vs code_chunker) -metadata: dict = {} -connector_id: int | None = None -search_space_id: int = 0 -created_by_id: str = "" -``` - -### 2. `IndexingPipelineService` - -The single pipeline. All connectors use it. Nothing changes inside it -when a new connector is added. - -**File:** `app/services/indexing_pipeline_service.py` - -**Public interface:** -``` -prepare_for_indexing(connector_documents) → list[(Document, ConnectorDocument)] -index(document, connector_doc, llm) → None -``` - -**Public interface:** -``` -prepare_for_indexing(connector_documents) → list[(Document, ConnectorDocument)] -index(document, connector_doc, llm) → None -``` - -**Private steps (one responsibility each):** -``` -_compute_unique_identifier_hash(connector_doc) → str -_compute_content_hash(connector_doc) → str -_find_existing_by_identifier(hash) → Document | None # always selectinload chunks -_find_existing_by_content(hash) → Document | None -_create(connector_doc, hash) → Document # creates new DB row -_mark_processing(document) → None # status transition -_mark_ready(document, connector_doc, ...) → None # status transition + full data -_mark_failed(document, error) → None # status transition + error -_summarize(connector_doc, llm) → str -_chunk(connector_doc) → list[str] -_embed_document(summary_content) → list[float] -_embed_chunks(segments) → list[list[float]] -``` - -**What `prepare_for_indexing` orchestrates:** -``` -for each ConnectorDocument: - _compute_unique_identifier_hash - _compute_content_hash - _find_existing_by_identifier - → not found: check content duplicate - → duplicate: skip - → new: _create - → found, content_hash same, title same: skip (unchanged) - → found, content_hash same, title different: update title inline - → found, content_hash different: queue for full reprocess -session.commit() once -``` - -**What `index` orchestrates:** -``` -try: - _mark_processing(document), commit - _summarize - _chunk - _embed_document - _embed_chunks - _mark_ready(document, connector_doc, ...) -except Exception as e: - rollback() - session.refresh(document) ← required: rollback expires all attributes - _mark_failed(document, e) - commit() -``` - -### 3. Adapters - -One per connector. Pure static converter. No IO, no DB, no hashing. - -**File:** `app/pipeline/adapters/{connector_name}_adapter.py` - -```python -class ClickUpAdapter: - @staticmethod - def to_connector_document(task: dict, connector_id, search_space_id, user_id) -> ConnectorDocument: - ... -``` - ---- - -## What Moves Into The Service - -| Currently in | Moves to | -|---|---| -| `connector_indexers/base.py` | `IndexingPipelineService` | -| `document_processors/base.py` | `IndexingPipelineService` (duplicates removed) | -| `document_converters.py` | `IndexingPipelineService` | -| `config.embedding_model_instance.embed()` (20 places) | `_embed_document`, `_embed_chunks` only | -| `config.chunker_instance.chunk()` (all connectors) | `_chunk` only | -| `config.code_chunker_instance.chunk()` (unused/GitHub) | `_chunk` only | - -**Stays outside (adapter concern):** -- `build_document_metadata_string()` -- `build_document_metadata_markdown()` -- `convert_element_to_markdown()` -- `convert_document_to_markdown()` - ---- - -## Implementation Order - -### Step 1 — Create `ConnectorDocument` -New file. Nothing references it yet. Zero risk. - -### Step 2 — Create `IndexingPipelineService` -New file. Nothing calls it yet. Zero risk. - -### Step 3 — Write tests (see test plan below) -Tests call the new service directly. No connector changed yet. - -### Step 4 — Migrate one connector (ClickUp — simplest) -- Write `ClickUpAdapter` -- Update `clickup_indexer.py` to use the service -- Keep old code until verified working in testing - -### Step 5 — Migrate remaining connectors one by one -Each connector migrated = old Phase 2 code deleted from that indexer. - -### Step 6 — Update `LinearKBSyncService` and `NotionKBSyncService` -Replace their Phase 2 reimplementation with a call to `service.index()`. - -### Step 7 — Update `composio_indexer.py` -Standalone indexer for Composio connectors (`composio_google_drive_connector.py`, -`composio_google_calendar_connector.py`, `composio_gmail_connector.py`). -Same migration pattern as standard connectors. - -### Step 8 — Delete duplicate base functions -Only after all connectors are migrated. -`connector_indexers/base.py` and `document_processors/base.py` — remove duplicates, -keep only connector helpers (`get_connector_by_id`, `calculate_date_range`, etc.) - ---- - -## Session Management Rules - -These rules apply to the pipeline implementation. They replace the inconsistent patterns -found across the 20+ existing connectors. - -### No `session.no_autoflush` -All connectors used `with session.no_autoflush` before `_find_existing_by_content`. -This is cargo-cult. `session.add(document)` happens AFTER the content_hash check, -so no staged documents exist in the session at check time. Autoflush is harmless. -For same-content same-batch items, autoflush correctly finds the first staged document -and deduplicates. Without `no_autoflush`, same-batch dedup works better, not worse. - -### Error handling in `index()` -After a rollback, SQLAlchemy expires all object attributes. Accessing any attribute -on `document` after rollback raises `MissingGreenlet` in async mode. -The correct pattern: -```python -except Exception as e: - await self.session.rollback() - await self.session.refresh(document) # reload from DB before touching attributes - await self._write_failed(document, str(e)) - await self.session.commit() -``` - -### `safe_set_chunks` requires `selectinload` -`safe_set_chunks` uses `set_committed_value` which marks old chunks as orphans for -cascade deletion. This only works if the old chunks were loaded into the session first. -`_find_existing_by_identifier` MUST always use `selectinload(Document.chunks)`. -Any call to `_write_ready` on a document not loaded with `selectinload` will silently -accumulate chunks instead of replacing them. - -### Phase 1 batch size -Not implemented upfront. If a connector proves to need it during migration, add a -`batch_size` parameter to `prepare_for_indexing` at that point. - -### `session.refresh()` is not needed after Phase 1 commit -After Phase 1's commit, `index()` only SETs attributes on the document — it never -READs them. SQLAlchemy write operations do not trigger lazy loading. No refresh needed. - ---- - -## Connector Deviations — Handled at Migration Time - -| Connector | Deviation | How handled | -|---|---|---| -| Webcrawler | Content unknown at Phase 1 — fetched during Phase 2 | Call `prepare_for_indexing` per URL after crawling, not in bulk | -| Slack, Discord | No LLM summarization | `should_summarize = False` in `ConnectorDocument` | -| GitHub | Custom 4000-char character chunking | `content_type = "code"` → `code_chunker_instance` | -| Obsidian | LLM called but result discarded, raw content embedded | `should_summarize = True` → pipeline fixes this correctly | - ---- - -## Testing Philosophy - -Two rules drive every decision below: - -**1. Test behavior through public interfaces, not implementation details.** -`IndexingPipelineService` exposes two public methods: `prepare_for_indexing` and -`index`. Private methods (`_create`, `_summarize`, `_mark_ready`, etc.) are -implementation details. Testing them directly couples tests to internal structure — -renaming a private method breaks tests even though behavior hasn't changed. - -**2. Mock only true external boundaries.** -Mocking internal collaborators (session, chunker) tests your assumptions about -those collaborators, not the actual code. A mock never fails because of a bug in -the real system. The DB is not mocked — a real test DB is used. Only the LLM and -embedding model are mocked: they are genuine external services (cost, latency, -non-determinism, network flakiness). - -**Consequence:** most tests are integration tests. Unit tests cover only pure -functions and value objects that have no dependencies. - ---- - -## Test Plan - -### Unit tests — pure functions and value objects, no mocks - -#### `ConnectorDocument` - -| Test | Checks | -|---|---| -| `source_markdown` empty → validation error | rejection of invalid input | -| `source_markdown` whitespace only → validation error | rejection of invalid input | -| Valid document created with defaults | all default values correct | -| `should_summarize=False` accepted | optional flag works | -| `content_type="code"` accepted | optional flag works | - -#### `_compute_unique_identifier_hash` + `_compute_content_hash` - -| Test | Checks | -|---|---| -| Same input → same hash (both functions) | determinism | -| Different `unique_id` → different hash | isolation by field | -| Different `search_space_id` → different hash | isolation by space | -| Different `document_type` → different hash | isolation by type | -| Same content, different space → different hash | space-scoped content dedup | -| Different content → different hash | content sensitivity | - -#### Adapters - -Each adapter is a pure static converter. Zero mocks needed. - -| Test | Checks | -|---|---| -| `to_connector_document` returns valid `ConnectorDocument` | output type | -| `unique_id` maps to correct source field | field mapping | -| `document_type` is correct enum value | enum correctness | -| `source_markdown` contains expected content | content mapping | -| `should_summarize` set correctly for this connector | flag correctness | -| `content_type` set correctly (code vs text) | flag correctness | -| Empty/missing fields handled gracefully | robustness | - ---- - -### Integration tests — real DB, LLM and embedding model mocked - -#### `prepare_for_indexing` - -| Test | Checks | -|---|---| -| New document → status=pending in DB, returned | `_create` + hash logic | -| Same `content_hash`, same title → skipped, not returned | unchanged dedup | -| Same `content_hash`, different title → title updated, not returned | title-only update | -| Changed `content_hash` → existing doc returned for reprocessing | update detection | -| Duplicate content hash within batch → skipped | same-batch dedup | -| Multiple documents → one commit for all | batch commit | - -#### `index` - -| Test | Mocks | Checks | -|---|---|---| -| `document.status = ready` after call | llm, embedding | happy path | -| `document.content = summary` when `should_summarize=True` | llm, embedding | summarization used | -| `document.content = source_markdown` when `should_summarize=False` | embedding | summarization skipped | -| `document.source_markdown` always set | llm, embedding | field always written | -| Chunks written to DB | llm, embedding | chunking + persistence | -| `document.status = processing` set before heavy work | llm (slow), embedding | status progression | -| LLM raises → `document.status = failed` | llm (raises) | error handling | -| After LLM error → not stuck in `processing` | llm (raises) | recovery correctness | -| After LLM error → chunks NOT written | llm (raises) | no partial data | - ---- - -## What Does NOT Need Tests - -| Component | Reason | -|---|---| -| Private methods of `IndexingPipelineService` | covered by integration tests on public interface | -| Celery task wiring | orchestration only, no logic | -| External API calls (Notion, Slack, etc.) | external dependency, not our logic | -| `get_user_long_context_llm` | existing service, not our code | -| `calculate_date_range` | already has a `# FIX:` comment — needs its own separate tests | -| Full end-to-end indexer run | too many moving parts, not unit or integration | - ---- - -## Files To Create - -``` -app/indexing_pipeline/ - connector_document.py ← ConnectorDocument model - adapters/ - clickup_adapter.py ← first adapter (pilot) - notion_adapter.py - slack_adapter.py - ... - google/ ← created when 2nd Google connector is migrated - shared.py ← shared pure helpers (metadata, markdown conversion) - google_drive_adapter.py - google_docs_adapter.py - -app/clients/ ← created when 2nd connector of a family is migrated - google_client.py ← shared OAuth + API client for all Google connectors - -app/indexing_pipeline/ - indexing_pipeline_service.py ← IndexingPipelineService - -tests/ - unit/ - indexing_pipeline/ - test_connector_document.py - test_compute_hashes.py - adapters/ - test_clickup_adapter.py - test_notion_adapter.py - test_slack_adapter.py - integration/ - indexing_pipeline/ - test_prepare_for_indexing.py - test_index.py -``` - -## TDD Build Order - -One test → one implementation → repeat. Never write multiple tests before making -the previous one green. - ---- - -### Phase 1 — `ConnectorDocument` (unit) - -**File:** `tests/unit/indexing_pipeline/test_connector_document.py` -**Implements:** `app/indexing_pipeline/connector_document.py` - -- `test_empty_source_markdown_raises` -- `test_whitespace_only_source_markdown_raises` -- `test_valid_document_created_with_defaults` -- `test_should_summarize_false_accepted` -- `test_content_type_code_accepted` - -→ After this phase: add `make_connector_document` factory fixture to `tests/conftest.py` - ---- - -### Phase 2 — Hash functions (unit) - -**File:** `tests/unit/indexing_pipeline/test_compute_hashes.py` -**Implements:** `_compute_unique_identifier_hash`, `_compute_content_hash` on `IndexingPipelineService` - -- `test_same_input_same_hash` -- `test_different_unique_id_different_hash` -- `test_different_search_space_id_different_hash` -- `test_different_document_type_different_hash` -- `test_same_content_same_space_same_hash` -- `test_same_content_different_space_different_hash` -- `test_different_content_different_hash` - ---- - -### Phase 3 — `prepare_for_indexing` (integration) - -**File:** `tests/integration/indexing_pipeline/test_prepare_for_indexing.py` -**Implements:** `prepare_for_indexing` + all private DB methods it uses - -- `test_new_document_written_as_pending` -- `test_unchanged_document_skipped` -- `test_title_only_change_updates_title` -- `test_changed_content_queued_for_reprocessing` -- `test_duplicate_content_in_batch_skipped` -- `test_multiple_documents_one_commit` - ---- - -### Phase 4 — `index` (integration) - -**File:** `tests/integration/indexing_pipeline/test_index.py` -**Implements:** `index` + all private processing methods it uses - -- `test_sets_status_ready` -- `test_content_is_summary_when_should_summarize_true` -- `test_content_is_source_markdown_when_should_summarize_false` -- `test_source_markdown_always_set` -- `test_chunks_written_to_db` -- `test_status_processing_set_before_heavy_work` -- `test_llm_error_sets_status_failed` -- `test_llm_error_document_not_stuck_in_processing` -- `test_llm_error_chunks_not_written` - ---- - -### Phase 5 — Adapters (unit, one file per connector) - -Start with ClickUp — simplest connector, no special deviations. - -**File:** `tests/unit/adapters/test_clickup_adapter.py` -**Implements:** `app/pipeline/adapters/clickup_adapter.py` - -Then repeat for each connector as it is migrated. - ---- - -**Total: 12 unit tests + 15 integration tests = 27 tests.** - ---- - -## Out of Scope (for now) - -Document processors (`youtube_processor`, `file_processors`, `markdown_processor`, -`extension_processor`, `circleback_processor`) follow the same 2-phase pattern and -will eventually migrate to the pipeline. Excluded from this refactor — they handle -user uploads, not connector syncs, and their migration is a separate task.