mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-06 20:15:17 +02:00
chore: cleanup
This commit is contained in:
parent
ab31bba3ad
commit
7a0b8838d1
1 changed files with 0 additions and 458 deletions
|
|
@ -1,458 +0,0 @@
|
|||
# Indexing Pipeline Refactor — Plan
|
||||
|
||||
## Context
|
||||
|
||||
The current system has 20+ connector indexers each implementing the same pipeline
|
||||
independently. This causes:
|
||||
|
||||
- `safe_set_chunks`, `get_current_timestamp`, `check_document_by_unique_identifier`
|
||||
defined twice across `connector_indexers/base.py` and `document_processors/base.py`
|
||||
- `config.embedding_model_instance.embed()` called in 20+ places directly
|
||||
- `document.content` stores different things per connector (summary vs raw text)
|
||||
- `source_markdown` column always NULL — never written by any connector
|
||||
- GitHub uses custom character chunking, bypassing the shared chunker
|
||||
- Obsidian calls LLM then discards the result, embeds raw text instead
|
||||
- `LinearKBSyncService` and `NotionKBSyncService` both say they
|
||||
"mirror Phase 2 logic exactly" — a third copy of the same pipeline
|
||||
- No tests anywhere
|
||||
|
||||
---
|
||||
|
||||
## What We Are Building
|
||||
|
||||
### 1. `ConnectorDocument`
|
||||
|
||||
The canonical data model. Every adapter produces one. The pipeline consumes one.
|
||||
Nothing else crosses this boundary.
|
||||
|
||||
**File:** `app/pipeline/connector_document.py`
|
||||
|
||||
```
|
||||
title: str
|
||||
source_markdown: str # raw content, always set, validated not empty
|
||||
unique_id: str # source-native key: page_id, issue_id, ts...
|
||||
document_type: DocumentType
|
||||
should_summarize: bool = True # drives SUMMARIZE step
|
||||
content_type: "text" | "code" # drives CHUNK step (chunker vs code_chunker)
|
||||
metadata: dict = {}
|
||||
connector_id: int | None = None
|
||||
search_space_id: int = 0
|
||||
created_by_id: str = ""
|
||||
```
|
||||
|
||||
### 2. `IndexingPipelineService`
|
||||
|
||||
The single pipeline. All connectors use it. Nothing changes inside it
|
||||
when a new connector is added.
|
||||
|
||||
**File:** `app/services/indexing_pipeline_service.py`
|
||||
|
||||
**Public interface:**
|
||||
```
|
||||
prepare_for_indexing(connector_documents) → list[(Document, ConnectorDocument)]
|
||||
index(document, connector_doc, llm) → None
|
||||
```
|
||||
|
||||
**Public interface:**
|
||||
```
|
||||
prepare_for_indexing(connector_documents) → list[(Document, ConnectorDocument)]
|
||||
index(document, connector_doc, llm) → None
|
||||
```
|
||||
|
||||
**Private steps (one responsibility each):**
|
||||
```
|
||||
_compute_unique_identifier_hash(connector_doc) → str
|
||||
_compute_content_hash(connector_doc) → str
|
||||
_find_existing_by_identifier(hash) → Document | None # always selectinload chunks
|
||||
_find_existing_by_content(hash) → Document | None
|
||||
_create(connector_doc, hash) → Document # creates new DB row
|
||||
_mark_processing(document) → None # status transition
|
||||
_mark_ready(document, connector_doc, ...) → None # status transition + full data
|
||||
_mark_failed(document, error) → None # status transition + error
|
||||
_summarize(connector_doc, llm) → str
|
||||
_chunk(connector_doc) → list[str]
|
||||
_embed_document(summary_content) → list[float]
|
||||
_embed_chunks(segments) → list[list[float]]
|
||||
```
|
||||
|
||||
**What `prepare_for_indexing` orchestrates:**
|
||||
```
|
||||
for each ConnectorDocument:
|
||||
_compute_unique_identifier_hash
|
||||
_compute_content_hash
|
||||
_find_existing_by_identifier
|
||||
→ not found: check content duplicate
|
||||
→ duplicate: skip
|
||||
→ new: _create
|
||||
→ found, content_hash same, title same: skip (unchanged)
|
||||
→ found, content_hash same, title different: update title inline
|
||||
→ found, content_hash different: queue for full reprocess
|
||||
session.commit() once
|
||||
```
|
||||
|
||||
**What `index` orchestrates:**
|
||||
```
|
||||
try:
|
||||
_mark_processing(document), commit
|
||||
_summarize
|
||||
_chunk
|
||||
_embed_document
|
||||
_embed_chunks
|
||||
_mark_ready(document, connector_doc, ...)
|
||||
except Exception as e:
|
||||
rollback()
|
||||
session.refresh(document) ← required: rollback expires all attributes
|
||||
_mark_failed(document, e)
|
||||
commit()
|
||||
```
|
||||
|
||||
### 3. Adapters
|
||||
|
||||
One per connector. Pure static converter. No IO, no DB, no hashing.
|
||||
|
||||
**File:** `app/pipeline/adapters/{connector_name}_adapter.py`
|
||||
|
||||
```python
|
||||
class ClickUpAdapter:
|
||||
@staticmethod
|
||||
def to_connector_document(task: dict, connector_id, search_space_id, user_id) -> ConnectorDocument:
|
||||
...
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## What Moves Into The Service
|
||||
|
||||
| Currently in | Moves to |
|
||||
|---|---|
|
||||
| `connector_indexers/base.py` | `IndexingPipelineService` |
|
||||
| `document_processors/base.py` | `IndexingPipelineService` (duplicates removed) |
|
||||
| `document_converters.py` | `IndexingPipelineService` |
|
||||
| `config.embedding_model_instance.embed()` (20 places) | `_embed_document`, `_embed_chunks` only |
|
||||
| `config.chunker_instance.chunk()` (all connectors) | `_chunk` only |
|
||||
| `config.code_chunker_instance.chunk()` (unused/GitHub) | `_chunk` only |
|
||||
|
||||
**Stays outside (adapter concern):**
|
||||
- `build_document_metadata_string()`
|
||||
- `build_document_metadata_markdown()`
|
||||
- `convert_element_to_markdown()`
|
||||
- `convert_document_to_markdown()`
|
||||
|
||||
---
|
||||
|
||||
## Implementation Order
|
||||
|
||||
### Step 1 — Create `ConnectorDocument`
|
||||
New file. Nothing references it yet. Zero risk.
|
||||
|
||||
### Step 2 — Create `IndexingPipelineService`
|
||||
New file. Nothing calls it yet. Zero risk.
|
||||
|
||||
### Step 3 — Write tests (see test plan below)
|
||||
Tests call the new service directly. No connector changed yet.
|
||||
|
||||
### Step 4 — Migrate one connector (ClickUp — simplest)
|
||||
- Write `ClickUpAdapter`
|
||||
- Update `clickup_indexer.py` to use the service
|
||||
- Keep old code until verified working in testing
|
||||
|
||||
### Step 5 — Migrate remaining connectors one by one
|
||||
Each connector migrated = old Phase 2 code deleted from that indexer.
|
||||
|
||||
### Step 6 — Update `LinearKBSyncService` and `NotionKBSyncService`
|
||||
Replace their Phase 2 reimplementation with a call to `service.index()`.
|
||||
|
||||
### Step 7 — Update `composio_indexer.py`
|
||||
Standalone indexer for Composio connectors (`composio_google_drive_connector.py`,
|
||||
`composio_google_calendar_connector.py`, `composio_gmail_connector.py`).
|
||||
Same migration pattern as standard connectors.
|
||||
|
||||
### Step 8 — Delete duplicate base functions
|
||||
Only after all connectors are migrated.
|
||||
`connector_indexers/base.py` and `document_processors/base.py` — remove duplicates,
|
||||
keep only connector helpers (`get_connector_by_id`, `calculate_date_range`, etc.)
|
||||
|
||||
---
|
||||
|
||||
## Session Management Rules
|
||||
|
||||
These rules apply to the pipeline implementation. They replace the inconsistent patterns
|
||||
found across the 20+ existing connectors.
|
||||
|
||||
### No `session.no_autoflush`
|
||||
All connectors used `with session.no_autoflush` before `_find_existing_by_content`.
|
||||
This is cargo-cult. `session.add(document)` happens AFTER the content_hash check,
|
||||
so no staged documents exist in the session at check time. Autoflush is harmless.
|
||||
For same-content same-batch items, autoflush correctly finds the first staged document
|
||||
and deduplicates. Without `no_autoflush`, same-batch dedup works better, not worse.
|
||||
|
||||
### Error handling in `index()`
|
||||
After a rollback, SQLAlchemy expires all object attributes. Accessing any attribute
|
||||
on `document` after rollback raises `MissingGreenlet` in async mode.
|
||||
The correct pattern:
|
||||
```python
|
||||
except Exception as e:
|
||||
await self.session.rollback()
|
||||
await self.session.refresh(document) # reload from DB before touching attributes
|
||||
await self._write_failed(document, str(e))
|
||||
await self.session.commit()
|
||||
```
|
||||
|
||||
### `safe_set_chunks` requires `selectinload`
|
||||
`safe_set_chunks` uses `set_committed_value` which marks old chunks as orphans for
|
||||
cascade deletion. This only works if the old chunks were loaded into the session first.
|
||||
`_find_existing_by_identifier` MUST always use `selectinload(Document.chunks)`.
|
||||
Any call to `_write_ready` on a document not loaded with `selectinload` will silently
|
||||
accumulate chunks instead of replacing them.
|
||||
|
||||
### Phase 1 batch size
|
||||
Not implemented upfront. If a connector proves to need it during migration, add a
|
||||
`batch_size` parameter to `prepare_for_indexing` at that point.
|
||||
|
||||
### `session.refresh()` is not needed after Phase 1 commit
|
||||
After Phase 1's commit, `index()` only SETs attributes on the document — it never
|
||||
READs them. SQLAlchemy write operations do not trigger lazy loading. No refresh needed.
|
||||
|
||||
---
|
||||
|
||||
## Connector Deviations — Handled at Migration Time
|
||||
|
||||
| Connector | Deviation | How handled |
|
||||
|---|---|---|
|
||||
| Webcrawler | Content unknown at Phase 1 — fetched during Phase 2 | Call `prepare_for_indexing` per URL after crawling, not in bulk |
|
||||
| Slack, Discord | No LLM summarization | `should_summarize = False` in `ConnectorDocument` |
|
||||
| GitHub | Custom 4000-char character chunking | `content_type = "code"` → `code_chunker_instance` |
|
||||
| Obsidian | LLM called but result discarded, raw content embedded | `should_summarize = True` → pipeline fixes this correctly |
|
||||
|
||||
---
|
||||
|
||||
## Testing Philosophy
|
||||
|
||||
Two rules drive every decision below:
|
||||
|
||||
**1. Test behavior through public interfaces, not implementation details.**
|
||||
`IndexingPipelineService` exposes two public methods: `prepare_for_indexing` and
|
||||
`index`. Private methods (`_create`, `_summarize`, `_mark_ready`, etc.) are
|
||||
implementation details. Testing them directly couples tests to internal structure —
|
||||
renaming a private method breaks tests even though behavior hasn't changed.
|
||||
|
||||
**2. Mock only true external boundaries.**
|
||||
Mocking internal collaborators (session, chunker) tests your assumptions about
|
||||
those collaborators, not the actual code. A mock never fails because of a bug in
|
||||
the real system. The DB is not mocked — a real test DB is used. Only the LLM and
|
||||
embedding model are mocked: they are genuine external services (cost, latency,
|
||||
non-determinism, network flakiness).
|
||||
|
||||
**Consequence:** most tests are integration tests. Unit tests cover only pure
|
||||
functions and value objects that have no dependencies.
|
||||
|
||||
---
|
||||
|
||||
## Test Plan
|
||||
|
||||
### Unit tests — pure functions and value objects, no mocks
|
||||
|
||||
#### `ConnectorDocument`
|
||||
|
||||
| Test | Checks |
|
||||
|---|---|
|
||||
| `source_markdown` empty → validation error | rejection of invalid input |
|
||||
| `source_markdown` whitespace only → validation error | rejection of invalid input |
|
||||
| Valid document created with defaults | all default values correct |
|
||||
| `should_summarize=False` accepted | optional flag works |
|
||||
| `content_type="code"` accepted | optional flag works |
|
||||
|
||||
#### `_compute_unique_identifier_hash` + `_compute_content_hash`
|
||||
|
||||
| Test | Checks |
|
||||
|---|---|
|
||||
| Same input → same hash (both functions) | determinism |
|
||||
| Different `unique_id` → different hash | isolation by field |
|
||||
| Different `search_space_id` → different hash | isolation by space |
|
||||
| Different `document_type` → different hash | isolation by type |
|
||||
| Same content, different space → different hash | space-scoped content dedup |
|
||||
| Different content → different hash | content sensitivity |
|
||||
|
||||
#### Adapters
|
||||
|
||||
Each adapter is a pure static converter. Zero mocks needed.
|
||||
|
||||
| Test | Checks |
|
||||
|---|---|
|
||||
| `to_connector_document` returns valid `ConnectorDocument` | output type |
|
||||
| `unique_id` maps to correct source field | field mapping |
|
||||
| `document_type` is correct enum value | enum correctness |
|
||||
| `source_markdown` contains expected content | content mapping |
|
||||
| `should_summarize` set correctly for this connector | flag correctness |
|
||||
| `content_type` set correctly (code vs text) | flag correctness |
|
||||
| Empty/missing fields handled gracefully | robustness |
|
||||
|
||||
---
|
||||
|
||||
### Integration tests — real DB, LLM and embedding model mocked
|
||||
|
||||
#### `prepare_for_indexing`
|
||||
|
||||
| Test | Checks |
|
||||
|---|---|
|
||||
| New document → status=pending in DB, returned | `_create` + hash logic |
|
||||
| Same `content_hash`, same title → skipped, not returned | unchanged dedup |
|
||||
| Same `content_hash`, different title → title updated, not returned | title-only update |
|
||||
| Changed `content_hash` → existing doc returned for reprocessing | update detection |
|
||||
| Duplicate content hash within batch → skipped | same-batch dedup |
|
||||
| Multiple documents → one commit for all | batch commit |
|
||||
|
||||
#### `index`
|
||||
|
||||
| Test | Mocks | Checks |
|
||||
|---|---|---|
|
||||
| `document.status = ready` after call | llm, embedding | happy path |
|
||||
| `document.content = summary` when `should_summarize=True` | llm, embedding | summarization used |
|
||||
| `document.content = source_markdown` when `should_summarize=False` | embedding | summarization skipped |
|
||||
| `document.source_markdown` always set | llm, embedding | field always written |
|
||||
| Chunks written to DB | llm, embedding | chunking + persistence |
|
||||
| `document.status = processing` set before heavy work | llm (slow), embedding | status progression |
|
||||
| LLM raises → `document.status = failed` | llm (raises) | error handling |
|
||||
| After LLM error → not stuck in `processing` | llm (raises) | recovery correctness |
|
||||
| After LLM error → chunks NOT written | llm (raises) | no partial data |
|
||||
|
||||
---
|
||||
|
||||
## What Does NOT Need Tests
|
||||
|
||||
| Component | Reason |
|
||||
|---|---|
|
||||
| Private methods of `IndexingPipelineService` | covered by integration tests on public interface |
|
||||
| Celery task wiring | orchestration only, no logic |
|
||||
| External API calls (Notion, Slack, etc.) | external dependency, not our logic |
|
||||
| `get_user_long_context_llm` | existing service, not our code |
|
||||
| `calculate_date_range` | already has a `# FIX:` comment — needs its own separate tests |
|
||||
| Full end-to-end indexer run | too many moving parts, not unit or integration |
|
||||
|
||||
---
|
||||
|
||||
## Files To Create
|
||||
|
||||
```
|
||||
app/indexing_pipeline/
|
||||
connector_document.py ← ConnectorDocument model
|
||||
adapters/
|
||||
clickup_adapter.py ← first adapter (pilot)
|
||||
notion_adapter.py
|
||||
slack_adapter.py
|
||||
...
|
||||
google/ ← created when 2nd Google connector is migrated
|
||||
shared.py ← shared pure helpers (metadata, markdown conversion)
|
||||
google_drive_adapter.py
|
||||
google_docs_adapter.py
|
||||
|
||||
app/clients/ ← created when 2nd connector of a family is migrated
|
||||
google_client.py ← shared OAuth + API client for all Google connectors
|
||||
|
||||
app/indexing_pipeline/
|
||||
indexing_pipeline_service.py ← IndexingPipelineService
|
||||
|
||||
tests/
|
||||
unit/
|
||||
indexing_pipeline/
|
||||
test_connector_document.py
|
||||
test_compute_hashes.py
|
||||
adapters/
|
||||
test_clickup_adapter.py
|
||||
test_notion_adapter.py
|
||||
test_slack_adapter.py
|
||||
integration/
|
||||
indexing_pipeline/
|
||||
test_prepare_for_indexing.py
|
||||
test_index.py
|
||||
```
|
||||
|
||||
## TDD Build Order
|
||||
|
||||
One test → one implementation → repeat. Never write multiple tests before making
|
||||
the previous one green.
|
||||
|
||||
---
|
||||
|
||||
### Phase 1 — `ConnectorDocument` (unit)
|
||||
|
||||
**File:** `tests/unit/indexing_pipeline/test_connector_document.py`
|
||||
**Implements:** `app/indexing_pipeline/connector_document.py`
|
||||
|
||||
- `test_empty_source_markdown_raises`
|
||||
- `test_whitespace_only_source_markdown_raises`
|
||||
- `test_valid_document_created_with_defaults`
|
||||
- `test_should_summarize_false_accepted`
|
||||
- `test_content_type_code_accepted`
|
||||
|
||||
→ After this phase: add `make_connector_document` factory fixture to `tests/conftest.py`
|
||||
|
||||
---
|
||||
|
||||
### Phase 2 — Hash functions (unit)
|
||||
|
||||
**File:** `tests/unit/indexing_pipeline/test_compute_hashes.py`
|
||||
**Implements:** `_compute_unique_identifier_hash`, `_compute_content_hash` on `IndexingPipelineService`
|
||||
|
||||
- `test_same_input_same_hash`
|
||||
- `test_different_unique_id_different_hash`
|
||||
- `test_different_search_space_id_different_hash`
|
||||
- `test_different_document_type_different_hash`
|
||||
- `test_same_content_same_space_same_hash`
|
||||
- `test_same_content_different_space_different_hash`
|
||||
- `test_different_content_different_hash`
|
||||
|
||||
---
|
||||
|
||||
### Phase 3 — `prepare_for_indexing` (integration)
|
||||
|
||||
**File:** `tests/integration/indexing_pipeline/test_prepare_for_indexing.py`
|
||||
**Implements:** `prepare_for_indexing` + all private DB methods it uses
|
||||
|
||||
- `test_new_document_written_as_pending`
|
||||
- `test_unchanged_document_skipped`
|
||||
- `test_title_only_change_updates_title`
|
||||
- `test_changed_content_queued_for_reprocessing`
|
||||
- `test_duplicate_content_in_batch_skipped`
|
||||
- `test_multiple_documents_one_commit`
|
||||
|
||||
---
|
||||
|
||||
### Phase 4 — `index` (integration)
|
||||
|
||||
**File:** `tests/integration/indexing_pipeline/test_index.py`
|
||||
**Implements:** `index` + all private processing methods it uses
|
||||
|
||||
- `test_sets_status_ready`
|
||||
- `test_content_is_summary_when_should_summarize_true`
|
||||
- `test_content_is_source_markdown_when_should_summarize_false`
|
||||
- `test_source_markdown_always_set`
|
||||
- `test_chunks_written_to_db`
|
||||
- `test_status_processing_set_before_heavy_work`
|
||||
- `test_llm_error_sets_status_failed`
|
||||
- `test_llm_error_document_not_stuck_in_processing`
|
||||
- `test_llm_error_chunks_not_written`
|
||||
|
||||
---
|
||||
|
||||
### Phase 5 — Adapters (unit, one file per connector)
|
||||
|
||||
Start with ClickUp — simplest connector, no special deviations.
|
||||
|
||||
**File:** `tests/unit/adapters/test_clickup_adapter.py`
|
||||
**Implements:** `app/pipeline/adapters/clickup_adapter.py`
|
||||
|
||||
Then repeat for each connector as it is migrated.
|
||||
|
||||
---
|
||||
|
||||
**Total: 12 unit tests + 15 integration tests = 27 tests.**
|
||||
|
||||
---
|
||||
|
||||
## Out of Scope (for now)
|
||||
|
||||
Document processors (`youtube_processor`, `file_processors`, `markdown_processor`,
|
||||
`extension_processor`, `circleback_processor`) follow the same 2-phase pattern and
|
||||
will eventually migrate to the pipeline. Excluded from this refactor — they handle
|
||||
user uploads, not connector syncs, and their migration is a separate task.
|
||||
Loading…
Add table
Add a link
Reference in a new issue