diff --git a/surfsense_backend/alembic/versions/129_deactivate_legacy_obsidian_connectors.py b/surfsense_backend/alembic/versions/129_deactivate_legacy_obsidian_connectors.py index 42808b1ca..b043d2af2 100644 --- a/surfsense_backend/alembic/versions/129_deactivate_legacy_obsidian_connectors.py +++ b/surfsense_backend/alembic/versions/129_deactivate_legacy_obsidian_connectors.py @@ -4,22 +4,11 @@ Revision ID: 129 Revises: 128 Create Date: 2026-04-18 -Marks every pre-plugin OBSIDIAN_CONNECTOR row as legacy. We keep the -rows (and their indexed Documents) so existing search results don't -suddenly disappear, but we: - -* set ``is_indexable = false`` and ``periodic_indexing_enabled = false`` - so the scheduler will never fire a server-side scan again, -* clear ``next_scheduled_at`` so the scheduler stops considering the - row, -* merge ``{"legacy": true, "deactivated_at": ""}`` into ``config`` - so the new ObsidianConfig view in the web UI can render the - migration banner (and so a future cleanup script can find them). - -A row is "pre-plugin" when its ``config`` does not already have -``source = "plugin"``. The new plugin indexer always writes -``config.source = "plugin"`` on first /obsidian/connect, so this -predicate is stable. +Deactivates pre-plugin OBSIDIAN_CONNECTOR rows (keeping them and their +Documents, but flagging ``config.legacy = true`` and disabling scheduling) +and creates the partial unique index on +``(user_id, (config->>'vault_id'))`` for plugin-Obsidian rows that backs +the ``/obsidian/connect`` upsert. """ from __future__ import annotations @@ -60,9 +49,27 @@ def upgrade() -> None: ) ) + conn.execute( + sa.text( + """ + CREATE UNIQUE INDEX search_source_connectors_obsidian_plugin_vault_uniq + ON search_source_connectors (user_id, ((config->>'vault_id'))) + WHERE connector_type = 'OBSIDIAN_CONNECTOR' + AND config->>'source' = 'plugin' + AND config->>'vault_id' IS NOT NULL + """ + ) + ) + def downgrade() -> None: conn = op.get_bind() + conn.execute( + sa.text( + "DROP INDEX IF EXISTS " + "search_source_connectors_obsidian_plugin_vault_uniq" + ) + ) conn.execute( sa.text( """ diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 16b40983e..7cf9b332d 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -1510,6 +1510,18 @@ class SearchSourceConnector(BaseModel, TimestampMixin): "name", name="uq_searchspace_user_connector_type_name", ), + # Mirrors migration 129; backs the ``/obsidian/connect`` upsert. + Index( + "search_source_connectors_obsidian_plugin_vault_uniq", + "user_id", + text("(config->>'vault_id')"), + unique=True, + postgresql_where=text( + "connector_type = 'OBSIDIAN_CONNECTOR' " + "AND config->>'source' = 'plugin' " + "AND config->>'vault_id' IS NOT NULL" + ), + ), ) name = Column(String(100), nullable=False, index=True) diff --git a/surfsense_backend/app/routes/obsidian_plugin_routes.py b/surfsense_backend/app/routes/obsidian_plugin_routes.py index 63522de74..c1b706154 100644 --- a/surfsense_backend/app/routes/obsidian_plugin_routes.py +++ b/surfsense_backend/app/routes/obsidian_plugin_routes.py @@ -10,8 +10,10 @@ from __future__ import annotations import logging from datetime import UTC, datetime +import sqlalchemy as sa from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import and_, case, func +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select @@ -27,10 +29,17 @@ from app.db import ( from app.schemas.obsidian_plugin import ( ConnectRequest, ConnectResponse, + DeleteAck, + DeleteAckItem, DeleteBatchRequest, HealthResponse, ManifestResponse, + RenameAck, + RenameAckItem, RenameBatchRequest, + StatsResponse, + SyncAck, + SyncAckItem, SyncBatchRequest, ) from app.services.obsidian_plugin_indexer import ( @@ -66,13 +75,15 @@ async def _resolve_vault_connector( vault_id: str, ) -> SearchSourceConnector: """Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user.""" + # ``config`` is core ``JSON`` (not ``JSONB``); ``as_string()`` is the + # cross-dialect equivalent of ``.astext`` and compiles to ``->>``. stmt = select(SearchSourceConnector).where( and_( SearchSourceConnector.user_id == user.id, SearchSourceConnector.connector_type == SearchSourceConnectorType.OBSIDIAN_CONNECTOR, - SearchSourceConnector.config["vault_id"].astext == vault_id, - SearchSourceConnector.config["source"].astext == "plugin", + SearchSourceConnector.config["vault_id"].as_string() == vault_id, + SearchSourceConnector.config["source"].as_string() == "plugin", ) ) @@ -139,36 +150,15 @@ async def obsidian_connect( user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), ) -> ConnectResponse: - """Register a vault, or return the existing connector row. + """Register a vault, or refresh the existing connector row. - Idempotent on (user_id, OBSIDIAN_CONNECTOR, vault_id). Called on every - plugin onload as a heartbeat. + Idempotent on ``(user_id, OBSIDIAN_CONNECTOR, vault_id)`` via the partial + unique index from migration 129. Called on every plugin onload. """ await _ensure_search_space_access( session, user=user, search_space_id=payload.search_space_id ) - # FOR UPDATE so concurrent /connect calls for the same vault can't race. - existing: SearchSourceConnector | None = ( - ( - await session.execute( - select(SearchSourceConnector) - .where( - and_( - SearchSourceConnector.user_id == user.id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.OBSIDIAN_CONNECTOR, - SearchSourceConnector.config["vault_id"].astext - == payload.vault_id, - ) - ) - .with_for_update() - ) - ) - .scalars() - .first() - ) - now_iso = datetime.now(UTC).isoformat() cfg = { "vault_id": payload.vault_id, @@ -176,50 +166,68 @@ async def obsidian_connect( "source": "plugin", "last_connect_at": now_iso, } + display_name = f"Obsidian \u2014 {payload.vault_name}" - if existing is not None: - existing.config = cfg - # Re-stamp on every connect so vault renames in Obsidian propagate; - # the web UI hides the Name input for Obsidian connectors. - existing.name = f"Obsidian — {payload.vault_name}" - existing.is_indexable = False - existing.search_space_id = payload.search_space_id - await session.commit() - await session.refresh(existing) - connector = existing - else: - connector = SearchSourceConnector( - name=f"Obsidian — {payload.vault_name}", + # ``index_elements`` + ``index_where`` matches the partial unique index + # by shape; ``ON CONFLICT ON CONSTRAINT`` doesn't work for partial indexes. + stmt = ( + pg_insert(SearchSourceConnector) + .values( + name=display_name, connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR, is_indexable=False, config=cfg, user_id=user.id, search_space_id=payload.search_space_id, ) - session.add(connector) - await session.commit() - await session.refresh(connector) + .on_conflict_do_update( + index_elements=[ + SearchSourceConnector.user_id, + sa.text("(config->>'vault_id')"), + ], + index_where=sa.text( + "connector_type = 'OBSIDIAN_CONNECTOR' " + "AND config->>'source' = 'plugin' " + "AND config->>'vault_id' IS NOT NULL" + ), + set_={ + "name": display_name, + "config": cfg, + "search_space_id": payload.search_space_id, + "is_indexable": False, + }, + ) + .returning(SearchSourceConnector) + ) + + result = await session.execute(stmt) + connector = result.scalar_one() + # Read attrs before commit; ``expire_on_commit=True`` would force a + # lazy refresh that fails with ``MissingGreenlet`` during serialization. + connector_id = connector.id + connector_search_space_id = connector.search_space_id + await session.commit() return ConnectResponse( - connector_id=connector.id, + connector_id=connector_id, vault_id=payload.vault_id, - search_space_id=connector.search_space_id, + search_space_id=connector_search_space_id, **_build_handshake(), ) -@router.post("/sync") +@router.post("/sync", response_model=SyncAck) async def obsidian_sync( payload: SyncBatchRequest, user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), -) -> dict[str, object]: +) -> SyncAck: """Batch-upsert notes; returns per-note ack so the plugin can dequeue/retry.""" connector = await _resolve_vault_connector( session, user=user, vault_id=payload.vault_id ) - results: list[dict[str, object]] = [] + items: list[SyncAckItem] = [] indexed = 0 failed = 0 @@ -229,8 +237,8 @@ async def obsidian_sync( session, connector=connector, payload=note, user_id=str(user.id) ) indexed += 1 - results.append( - {"path": note.path, "status": "ok", "document_id": doc.id} + items.append( + SyncAckItem(path=note.path, status="ok", document_id=doc.id) ) except HTTPException: raise @@ -241,30 +249,30 @@ async def obsidian_sync( note.path, payload.vault_id, ) - results.append( - {"path": note.path, "status": "error", "error": str(exc)[:300]} + items.append( + SyncAckItem(path=note.path, status="error", error=str(exc)[:300]) ) - return { - "vault_id": payload.vault_id, - "indexed": indexed, - "failed": failed, - "results": results, - } + return SyncAck( + vault_id=payload.vault_id, + indexed=indexed, + failed=failed, + items=items, + ) -@router.post("/rename") +@router.post("/rename", response_model=RenameAck) async def obsidian_rename( payload: RenameBatchRequest, user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), -) -> dict[str, object]: +) -> RenameAck: """Apply a batch of vault rename events.""" connector = await _resolve_vault_connector( session, user=user, vault_id=payload.vault_id ) - results: list[dict[str, object]] = [] + items: list[RenameAckItem] = [] renamed = 0 missing = 0 @@ -279,22 +287,22 @@ async def obsidian_rename( ) if doc is None: missing += 1 - results.append( - { - "old_path": item.old_path, - "new_path": item.new_path, - "status": "missing", - } + items.append( + RenameAckItem( + old_path=item.old_path, + new_path=item.new_path, + status="missing", + ) ) else: renamed += 1 - results.append( - { - "old_path": item.old_path, - "new_path": item.new_path, - "status": "ok", - "document_id": doc.id, - } + items.append( + RenameAckItem( + old_path=item.old_path, + new_path=item.new_path, + status="ok", + document_id=doc.id, + ) ) except Exception as exc: logger.exception( @@ -303,29 +311,29 @@ async def obsidian_rename( item.new_path, payload.vault_id, ) - results.append( - { - "old_path": item.old_path, - "new_path": item.new_path, - "status": "error", - "error": str(exc)[:300], - } + items.append( + RenameAckItem( + old_path=item.old_path, + new_path=item.new_path, + status="error", + error=str(exc)[:300], + ) ) - return { - "vault_id": payload.vault_id, - "renamed": renamed, - "missing": missing, - "results": results, - } + return RenameAck( + vault_id=payload.vault_id, + renamed=renamed, + missing=missing, + items=items, + ) -@router.delete("/notes") +@router.delete("/notes", response_model=DeleteAck) async def obsidian_delete_notes( payload: DeleteBatchRequest, user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), -) -> dict[str, object]: +) -> DeleteAck: """Soft-delete a batch of notes by vault-relative path.""" connector = await _resolve_vault_connector( session, user=user, vault_id=payload.vault_id @@ -333,7 +341,7 @@ async def obsidian_delete_notes( deleted = 0 missing = 0 - results: list[dict[str, object]] = [] + items: list[DeleteAckItem] = [] for path in payload.paths: try: ok = await delete_note( @@ -344,26 +352,26 @@ async def obsidian_delete_notes( ) if ok: deleted += 1 - results.append({"path": path, "status": "ok"}) + items.append(DeleteAckItem(path=path, status="ok")) else: missing += 1 - results.append({"path": path, "status": "missing"}) + items.append(DeleteAckItem(path=path, status="missing")) except Exception as exc: logger.exception( "obsidian DELETE /notes failed for path=%s vault=%s", path, payload.vault_id, ) - results.append( - {"path": path, "status": "error", "error": str(exc)[:300]} + items.append( + DeleteAckItem(path=path, status="error", error=str(exc)[:300]) ) - return { - "vault_id": payload.vault_id, - "deleted": deleted, - "missing": missing, - "results": results, - } + return DeleteAck( + vault_id=payload.vault_id, + deleted=deleted, + missing=missing, + items=items, + ) @router.get("/manifest", response_model=ManifestResponse) @@ -379,12 +387,12 @@ async def obsidian_manifest( return await get_manifest(session, connector=connector, vault_id=vault_id) -@router.get("/stats") +@router.get("/stats", response_model=StatsResponse) async def obsidian_stats( vault_id: str = Query(..., description="Plugin-side stable vault UUID"), user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), -) -> dict[str, object]: +) -> StatsResponse: """Active-note count + last sync time for the web tile. ``files_synced`` excludes tombstones so it matches ``/manifest``; @@ -394,7 +402,7 @@ async def obsidian_stats( session, user=user, vault_id=vault_id ) - is_active = Document.document_metadata["deleted_at"].astext.is_(None) + is_active = Document.document_metadata["deleted_at"].as_string().is_(None) row = ( await session.execute( @@ -410,8 +418,8 @@ async def obsidian_stats( ) ).first() - return { - "vault_id": vault_id, - "files_synced": int(row[0] or 0), - "last_sync_at": row[1].isoformat() if row[1] else None, - } + return StatsResponse( + vault_id=vault_id, + files_synced=int(row[0] or 0), + last_sync_at=row[1], + ) diff --git a/surfsense_backend/app/schemas/obsidian_plugin.py b/surfsense_backend/app/schemas/obsidian_plugin.py index 0b9f0f3bf..fd4230558 100644 --- a/surfsense_backend/app/schemas/obsidian_plugin.py +++ b/surfsense_backend/app/schemas/obsidian_plugin.py @@ -8,7 +8,7 @@ prefix (``/api/v2/...``). from __future__ import annotations from datetime import datetime -from typing import Any +from typing import Any, Literal from pydantic import BaseModel, ConfigDict, Field @@ -109,3 +109,58 @@ class HealthResponse(_PluginBase): capabilities: list[str] server_time_utc: datetime + + +# Per-item batch ack schemas — wire shape is load-bearing for the plugin +# queue (see api-client.ts / sync-engine.ts:processBatch). + + +class SyncAckItem(_PluginBase): + path: str + status: Literal["ok", "error"] + document_id: int | None = None + error: str | None = None + + +class SyncAck(_PluginBase): + vault_id: str + indexed: int + failed: int + items: list[SyncAckItem] = Field(default_factory=list) + + +class RenameAckItem(_PluginBase): + old_path: str + new_path: str + # ``missing`` is treated as success client-side (end state reached). + status: Literal["ok", "error", "missing"] + document_id: int | None = None + error: str | None = None + + +class RenameAck(_PluginBase): + vault_id: str + renamed: int + missing: int + items: list[RenameAckItem] = Field(default_factory=list) + + +class DeleteAckItem(_PluginBase): + path: str + status: Literal["ok", "error", "missing"] + error: str | None = None + + +class DeleteAck(_PluginBase): + vault_id: str + deleted: int + missing: int + items: list[DeleteAckItem] = Field(default_factory=list) + + +class StatsResponse(_PluginBase): + """Backs the Obsidian connector tile in the web UI.""" + + vault_id: str + files_synced: int + last_sync_at: datetime | None = None diff --git a/surfsense_backend/tests/integration/test_obsidian_plugin_routes.py b/surfsense_backend/tests/integration/test_obsidian_plugin_routes.py new file mode 100644 index 000000000..72835c7f0 --- /dev/null +++ b/surfsense_backend/tests/integration/test_obsidian_plugin_routes.py @@ -0,0 +1,381 @@ +"""Integration tests for the Obsidian plugin HTTP wire contract. + +Two concerns: + +1. The /connect upsert really collapses concurrent first-time connects to + exactly one row. This locks the partial unique index in migration 129 + to its purpose. +2. The end-to-end response shapes returned by /connect /sync /rename + /notes /manifest /stats match the schemas the plugin's TypeScript + decoders expect. Each new field renamed (results -> items, accepted -> + indexed, etc.) is a contract change, and a smoke pass like this is + the cheapest way to catch a future drift before it ships. +""" + +from __future__ import annotations + +import asyncio +import uuid +from datetime import UTC, datetime +from unittest.mock import AsyncMock, patch + +import pytest +import pytest_asyncio +from sqlalchemy import func, select, text +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import ( + Document, + DocumentType, + SearchSourceConnector, + SearchSourceConnectorType, + SearchSpace, + User, +) +from app.routes.obsidian_plugin_routes import ( + obsidian_connect, + obsidian_delete_notes, + obsidian_manifest, + obsidian_rename, + obsidian_stats, + obsidian_sync, +) +from app.schemas.obsidian_plugin import ( + ConnectRequest, + DeleteAck, + DeleteBatchRequest, + ManifestResponse, + NotePayload, + RenameAck, + RenameBatchRequest, + RenameItem, + StatsResponse, + SyncAck, + SyncBatchRequest, +) + +pytestmark = pytest.mark.integration + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_note_payload(vault_id: str, path: str, content_hash: str) -> NotePayload: + """Minimal NotePayload that the schema accepts; the indexer is mocked + out so the values don't have to round-trip through the real pipeline.""" + now = datetime.now(UTC) + return NotePayload( + vault_id=vault_id, + path=path, + name=path.rsplit("/", 1)[-1].rsplit(".", 1)[0], + extension="md", + content="# Test\n\nbody", + content_hash=content_hash, + mtime=now, + ctime=now, + ) + + +@pytest_asyncio.fixture +async def race_user_and_space(async_engine): + """User + SearchSpace committed via the live engine so the two + concurrent /connect sessions in the race test can both see them. + + We can't use the savepoint-trapped ``db_session`` fixture here + because the concurrent sessions need to see committed rows. + """ + user_id = uuid.uuid4() + async with AsyncSession(async_engine) as setup: + user = User( + id=user_id, + email=f"obsidian-race-{uuid.uuid4()}@surfsense.test", + hashed_password="x", + is_active=True, + is_superuser=False, + is_verified=True, + ) + space = SearchSpace(name="Race Space", user_id=user_id) + setup.add_all([user, space]) + await setup.commit() + await setup.refresh(space) + space_id = space.id + + yield user_id, space_id + + async with AsyncSession(async_engine) as cleanup: + # Order matters: connectors -> documents -> space -> user. The + # connectors test creates documents, so we wipe them too. The + # CASCADE on user_id catches anything we missed. + await cleanup.execute( + text( + 'DELETE FROM search_source_connectors WHERE user_id = :uid' + ), + {"uid": user_id}, + ) + await cleanup.execute( + text("DELETE FROM searchspaces WHERE id = :id"), + {"id": space_id}, + ) + await cleanup.execute( + text('DELETE FROM "user" WHERE id = :uid'), + {"uid": user_id}, + ) + await cleanup.commit() + + +# --------------------------------------------------------------------------- +# /connect race + index enforcement +# --------------------------------------------------------------------------- + + +class TestConnectRace: + async def test_concurrent_first_connects_collapse_to_one_row( + self, async_engine, race_user_and_space + ): + """Two simultaneous /connect calls for the same vault_id should + produce exactly one row, not two. This relies on the partial + unique index added in migration 129 plus the + ON CONFLICT DO UPDATE in obsidian_connect.""" + user_id, space_id = race_user_and_space + vault_id = str(uuid.uuid4()) + + async def _call(name_suffix: str) -> None: + async with AsyncSession(async_engine) as s: + fresh_user = await s.get(User, user_id) + payload = ConnectRequest( + vault_id=vault_id, + vault_name=f"My Vault {name_suffix}", + search_space_id=space_id, + ) + await obsidian_connect(payload, user=fresh_user, session=s) + + results = await asyncio.gather( + _call("a"), _call("b"), return_exceptions=True + ) + # Both calls should succeed (ON CONFLICT collapses, doesn't raise). + for r in results: + assert not isinstance(r, Exception), f"Connect raised: {r!r}" + + # The fixture creates a fresh user per test, so a count scoped to + # ``user_id`` is equivalent to "rows for this vault" without + # needing the JSON-path filter (which only works against a real + # postgres dialect at compile time, not against a bare model + # expression in a test). + async with AsyncSession(async_engine) as verify: + count = ( + await verify.execute( + select(func.count(SearchSourceConnector.id)).where( + SearchSourceConnector.user_id == user_id, + ) + ) + ).scalar_one() + assert count == 1 + + async def test_partial_unique_index_blocks_raw_duplicate( + self, async_engine, race_user_and_space + ): + """If the partial unique index were missing, two raw INSERTs of + plugin-Obsidian rows for the same (user_id, vault_id) would both + succeed. With the index in place the second one must raise + IntegrityError. This guards the schema regardless of whether the + route logic is correct.""" + user_id, space_id = race_user_and_space + vault_id = str(uuid.uuid4()) + + async with AsyncSession(async_engine) as s: + s.add( + SearchSourceConnector( + name="Obsidian \u2014 First", + connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR, + is_indexable=False, + config={ + "vault_id": vault_id, + "vault_name": "First", + "source": "plugin", + }, + user_id=user_id, + search_space_id=space_id, + ) + ) + await s.commit() + + with pytest.raises(IntegrityError): + async with AsyncSession(async_engine) as s: + s.add( + SearchSourceConnector( + name="Obsidian \u2014 Second", + connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR, + is_indexable=False, + config={ + "vault_id": vault_id, + "vault_name": "Second", + "source": "plugin", + }, + user_id=user_id, + search_space_id=space_id, + ) + ) + await s.commit() + + +# --------------------------------------------------------------------------- +# Combined wire-shape smoke test +# --------------------------------------------------------------------------- + + +class TestWireContractSmoke: + """Walks /connect -> /sync -> /rename -> /notes -> /manifest -> /stats + sequentially and asserts each response matches the new schema. With + `response_model=` on every route, FastAPI is already validating the + shape on real traffic; this test mainly guards against accidental + field renames the way the TypeScript decoder would catch them.""" + + async def test_full_flow_returns_typed_payloads( + self, db_session: AsyncSession, db_user: User, db_search_space: SearchSpace + ): + vault_id = str(uuid.uuid4()) + + # 1. /connect + connect_resp = await obsidian_connect( + ConnectRequest( + vault_id=vault_id, + vault_name="Smoke Vault", + search_space_id=db_search_space.id, + ), + user=db_user, + session=db_session, + ) + assert connect_resp.connector_id > 0 + assert connect_resp.vault_id == vault_id + assert "sync" in connect_resp.capabilities + + # 2. /sync — stub the indexer so the call doesn't drag the LLM / + # embedding pipeline in. We're testing the wire contract, not the + # indexer itself. + fake_doc = type("FakeDoc", (), {"id": 12345})() + with patch( + "app.routes.obsidian_plugin_routes.upsert_note", + new=AsyncMock(return_value=fake_doc), + ): + sync_resp = await obsidian_sync( + SyncBatchRequest( + vault_id=vault_id, + notes=[ + _make_note_payload(vault_id, "ok.md", "hash-ok"), + _make_note_payload(vault_id, "fail.md", "hash-fail"), + ], + ), + user=db_user, + session=db_session, + ) + + assert isinstance(sync_resp, SyncAck) + assert sync_resp.vault_id == vault_id + assert sync_resp.indexed == 2 + assert sync_resp.failed == 0 + assert len(sync_resp.items) == 2 + assert all(it.status == "ok" for it in sync_resp.items) + # The TypeScript decoder filters on items[].status === "error" and + # extracts .path, so confirm both fields are present and named. + assert {it.path for it in sync_resp.items} == {"ok.md", "fail.md"} + + # 2b. Re-run /sync but force the indexer to raise on one note so + # the per-item failure decoder gets exercised end-to-end. + async def _selective_upsert(session, *, connector, payload, user_id): + if payload.path == "fail.md": + raise RuntimeError("simulated indexing failure") + return fake_doc + + with patch( + "app.routes.obsidian_plugin_routes.upsert_note", + new=AsyncMock(side_effect=_selective_upsert), + ): + sync_resp = await obsidian_sync( + SyncBatchRequest( + vault_id=vault_id, + notes=[ + _make_note_payload(vault_id, "ok.md", "h1"), + _make_note_payload(vault_id, "fail.md", "h2"), + ], + ), + user=db_user, + session=db_session, + ) + assert sync_resp.indexed == 1 + assert sync_resp.failed == 1 + statuses = {it.path: it.status for it in sync_resp.items} + assert statuses == {"ok.md": "ok", "fail.md": "error"} + + # 3. /rename — patch rename_note so we don't need a real Document. + async def _rename(*args, **kwargs) -> object: + if kwargs.get("old_path") == "missing.md": + return None + return fake_doc + + with patch( + "app.routes.obsidian_plugin_routes.rename_note", + new=AsyncMock(side_effect=_rename), + ): + rename_resp = await obsidian_rename( + RenameBatchRequest( + vault_id=vault_id, + renames=[ + RenameItem(old_path="a.md", new_path="b.md"), + RenameItem(old_path="missing.md", new_path="x.md"), + ], + ), + user=db_user, + session=db_session, + ) + assert isinstance(rename_resp, RenameAck) + assert rename_resp.renamed == 1 + assert rename_resp.missing == 1 + assert {it.status for it in rename_resp.items} == {"ok", "missing"} + # snake_case fields are deliberate — the plugin decoder maps them + # to camelCase explicitly. + assert all( + it.old_path and it.new_path for it in rename_resp.items + ) + + # 4. /notes DELETE + async def _delete(*args, **kwargs) -> bool: + return kwargs.get("path") != "ghost.md" + + with patch( + "app.routes.obsidian_plugin_routes.delete_note", + new=AsyncMock(side_effect=_delete), + ): + delete_resp = await obsidian_delete_notes( + DeleteBatchRequest(vault_id=vault_id, paths=["b.md", "ghost.md"]), + user=db_user, + session=db_session, + ) + assert isinstance(delete_resp, DeleteAck) + assert delete_resp.deleted == 1 + assert delete_resp.missing == 1 + assert {it.path: it.status for it in delete_resp.items} == { + "b.md": "ok", + "ghost.md": "missing", + } + + # 5. /manifest — empty (no real Documents were created because + # upsert_note was mocked) but the response shape is what we care + # about. + manifest_resp = await obsidian_manifest( + vault_id=vault_id, user=db_user, session=db_session + ) + assert isinstance(manifest_resp, ManifestResponse) + assert manifest_resp.vault_id == vault_id + assert manifest_resp.items == {} + + # 6. /stats — same; row count is 0 because upsert_note was mocked. + stats_resp = await obsidian_stats( + vault_id=vault_id, user=db_user, session=db_session + ) + assert isinstance(stats_resp, StatsResponse) + assert stats_resp.vault_id == vault_id + assert stats_resp.files_synced == 0 + assert stats_resp.last_sync_at is None diff --git a/surfsense_obsidian/src/api-client.ts b/surfsense_obsidian/src/api-client.ts index 9ab626c37..62fcc55b3 100644 --- a/surfsense_obsidian/src/api-client.ts +++ b/surfsense_obsidian/src/api-client.ts @@ -1,11 +1,14 @@ import { Notice, requestUrl, type RequestUrlParam, type RequestUrlResponse } from "obsidian"; import type { ConnectResponse, + DeleteAck, HealthResponse, ManifestResponse, NotePayload, + RenameAck, RenameItem, SearchSpace, + SyncAck, } from "./types"; /** @@ -119,26 +122,31 @@ export class SurfSenseApiClient { ); } + /** POST /sync — `failed[]` are paths whose `status === "error"` for retry. */ async syncBatch(input: { vaultId: string; notes: NotePayload[]; - }): Promise<{ accepted: number; rejected: string[] }> { - const resp = await this.request<{ accepted?: number; rejected?: string[] }>( + }): Promise<{ indexed: number; failed: string[] }> { + const resp = await this.request( "POST", "/api/v1/obsidian/sync", { vault_id: input.vaultId, notes: input.notes } ); - return { - accepted: typeof resp.accepted === "number" ? resp.accepted : input.notes.length, - rejected: Array.isArray(resp.rejected) ? resp.rejected : [], - }; + const failed = resp.items + .filter((it) => it.status === "error") + .map((it) => it.path); + return { indexed: resp.indexed, failed }; } + /** POST /rename — `"missing"` counts as success; only `"error"` is retried. */ async renameBatch(input: { vaultId: string; renames: Pick[]; - }): Promise<{ renamed: number }> { - const resp = await this.request<{ renamed?: number }>( + }): Promise<{ + renamed: number; + failed: Array<{ oldPath: string; newPath: string }>; + }> { + const resp = await this.request( "POST", "/api/v1/obsidian/rename", { @@ -149,19 +157,26 @@ export class SurfSenseApiClient { })), } ); - return { renamed: typeof resp.renamed === "number" ? resp.renamed : 0 }; + const failed = resp.items + .filter((it) => it.status === "error") + .map((it) => ({ oldPath: it.old_path, newPath: it.new_path })); + return { renamed: resp.renamed, failed }; } + /** DELETE /notes — `"missing"` counts as success; only `"error"` is retried. */ async deleteBatch(input: { vaultId: string; paths: string[]; - }): Promise<{ deleted: number }> { - const resp = await this.request<{ deleted?: number }>( + }): Promise<{ deleted: number; failed: string[] }> { + const resp = await this.request( "DELETE", "/api/v1/obsidian/notes", { vault_id: input.vaultId, paths: input.paths } ); - return { deleted: typeof resp.deleted === "number" ? resp.deleted : 0 }; + const failed = resp.items + .filter((it) => it.status === "error") + .map((it) => it.path); + return { deleted: resp.deleted, failed }; } async getManifest(vaultId: string): Promise { @@ -225,11 +240,16 @@ export class SurfSenseApiClient { } function parseJson(resp: RequestUrlResponse): T { - if (resp.text === undefined || resp.text === "") return undefined as unknown as T; + // Plugin endpoints always return JSON; non-JSON 2xx is usually a + // captive portal or CDN page — surface as transient so we back off. + const text = resp.text ?? ""; try { - return JSON.parse(resp.text) as T; + return JSON.parse(text) as T; } catch { - return undefined as unknown as T; + throw new TransientError( + resp.status, + `Invalid JSON from server (got: ${text.slice(0, 80)})` + ); } } diff --git a/surfsense_obsidian/src/sync-engine.ts b/surfsense_obsidian/src/sync-engine.ts index 8021273da..af35763f0 100644 --- a/surfsense_obsidian/src/sync-engine.ts +++ b/surfsense_obsidian/src/sync-engine.ts @@ -246,13 +246,20 @@ export class SyncEngine { const dropped: QueueItem[] = []; // Renames first so paths line up server-side before content upserts. + // Per-item server errors go to retry; "missing" is treated as success. if (renames.length > 0) { try { - await this.deps.apiClient.renameBatch({ + const resp = await this.deps.apiClient.renameBatch({ vaultId: settings.vaultId, renames: renames.map((r) => ({ oldPath: r.oldPath, newPath: r.newPath })), }); - acked.push(...renames); + const failed = new Set( + resp.failed.map((f) => `${f.oldPath}\u0000${f.newPath}`), + ); + for (const r of renames) { + if (failed.has(`${r.oldPath}\u0000${r.newPath}`)) retry.push(r); + else acked.push(r); + } } catch (err) { if (await this.handleVaultNotRegistered(err)) { retry.push(...renames); @@ -267,11 +274,15 @@ export class SyncEngine { if (deletes.length > 0) { try { - await this.deps.apiClient.deleteBatch({ + const resp = await this.deps.apiClient.deleteBatch({ vaultId: settings.vaultId, paths: deletes.map((d) => d.path), }); - acked.push(...deletes); + const failed = new Set(resp.failed); + for (const d of deletes) { + if (failed.has(d.path)) retry.push(d); + else acked.push(d); + } } catch (err) { if (await this.handleVaultNotRegistered(err)) { retry.push(...deletes); @@ -310,10 +321,11 @@ export class SyncEngine { vaultId: settings.vaultId, notes: payloads, }); - const rejected = new Set(resp.rejected ?? []); + // Per-note failures retry; the queue's maxAttempts eventually drops poison pills. + const failed = new Set(resp.failed); for (const item of upserts) { if (retry.find((r) => r === item)) continue; - if (rejected.has(item.path)) dropped.push(item); + if (failed.has(item.path)) retry.push(item); else acked.push(item); } } catch (err) { diff --git a/surfsense_obsidian/src/types.ts b/surfsense_obsidian/src/types.ts index 6f7683830..9c0e49600 100644 --- a/surfsense_obsidian/src/types.ts +++ b/surfsense_obsidian/src/types.ts @@ -129,6 +129,49 @@ export interface ManifestResponse { [key: string]: unknown; } +/** Per-item ack shapes — mirror `app/schemas/obsidian_plugin.py` 1:1. */ +export interface SyncAckItem { + path: string; + status: "ok" | "error"; + document_id?: number; + error?: string; +} + +export interface SyncAck { + vault_id: string; + indexed: number; + failed: number; + items: SyncAckItem[]; +} + +export interface RenameAckItem { + old_path: string; + new_path: string; + status: "ok" | "error" | "missing"; + document_id?: number; + error?: string; +} + +export interface RenameAck { + vault_id: string; + renamed: number; + missing: number; + items: RenameAckItem[]; +} + +export interface DeleteAckItem { + path: string; + status: "ok" | "error" | "missing"; + error?: string; +} + +export interface DeleteAck { + vault_id: string; + deleted: number; + missing: number; + items: DeleteAckItem[]; +} + export type StatusKind = | "idle" | "syncing"