feat: deactivate legacy Obsidian connectors and implement partial unique index for improved upsert handling

This commit is contained in:
Anish Sarkar 2026-04-21 03:18:44 +05:30
parent 4d3406341d
commit 2d90ed0fec
8 changed files with 683 additions and 145 deletions

View file

@ -4,22 +4,11 @@ Revision ID: 129
Revises: 128 Revises: 128
Create Date: 2026-04-18 Create Date: 2026-04-18
Marks every pre-plugin OBSIDIAN_CONNECTOR row as legacy. We keep the Deactivates pre-plugin OBSIDIAN_CONNECTOR rows (keeping them and their
rows (and their indexed Documents) so existing search results don't Documents, but flagging ``config.legacy = true`` and disabling scheduling)
suddenly disappear, but we: and creates the partial unique index on
``(user_id, (config->>'vault_id'))`` for plugin-Obsidian rows that backs
* set ``is_indexable = false`` and ``periodic_indexing_enabled = false`` the ``/obsidian/connect`` upsert.
so the scheduler will never fire a server-side scan again,
* clear ``next_scheduled_at`` so the scheduler stops considering the
row,
* merge ``{"legacy": true, "deactivated_at": "<now>"}`` into ``config``
so the new ObsidianConfig view in the web UI can render the
migration banner (and so a future cleanup script can find them).
A row is "pre-plugin" when its ``config`` does not already have
``source = "plugin"``. The new plugin indexer always writes
``config.source = "plugin"`` on first /obsidian/connect, so this
predicate is stable.
""" """
from __future__ import annotations from __future__ import annotations
@ -60,9 +49,27 @@ def upgrade() -> None:
) )
) )
conn.execute(
sa.text(
"""
CREATE UNIQUE INDEX search_source_connectors_obsidian_plugin_vault_uniq
ON search_source_connectors (user_id, ((config->>'vault_id')))
WHERE connector_type = 'OBSIDIAN_CONNECTOR'
AND config->>'source' = 'plugin'
AND config->>'vault_id' IS NOT NULL
"""
)
)
def downgrade() -> None: def downgrade() -> None:
conn = op.get_bind() conn = op.get_bind()
conn.execute(
sa.text(
"DROP INDEX IF EXISTS "
"search_source_connectors_obsidian_plugin_vault_uniq"
)
)
conn.execute( conn.execute(
sa.text( sa.text(
""" """

View file

@ -1510,6 +1510,18 @@ class SearchSourceConnector(BaseModel, TimestampMixin):
"name", "name",
name="uq_searchspace_user_connector_type_name", name="uq_searchspace_user_connector_type_name",
), ),
# Mirrors migration 129; backs the ``/obsidian/connect`` upsert.
Index(
"search_source_connectors_obsidian_plugin_vault_uniq",
"user_id",
text("(config->>'vault_id')"),
unique=True,
postgresql_where=text(
"connector_type = 'OBSIDIAN_CONNECTOR' "
"AND config->>'source' = 'plugin' "
"AND config->>'vault_id' IS NOT NULL"
),
),
) )
name = Column(String(100), nullable=False, index=True) name = Column(String(100), nullable=False, index=True)

View file

@ -10,8 +10,10 @@ from __future__ import annotations
import logging import logging
from datetime import UTC, datetime from datetime import UTC, datetime
import sqlalchemy as sa
from fastapi import APIRouter, Depends, HTTPException, Query, status from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import and_, case, func from sqlalchemy import and_, case, func
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select from sqlalchemy.future import select
@ -27,10 +29,17 @@ from app.db import (
from app.schemas.obsidian_plugin import ( from app.schemas.obsidian_plugin import (
ConnectRequest, ConnectRequest,
ConnectResponse, ConnectResponse,
DeleteAck,
DeleteAckItem,
DeleteBatchRequest, DeleteBatchRequest,
HealthResponse, HealthResponse,
ManifestResponse, ManifestResponse,
RenameAck,
RenameAckItem,
RenameBatchRequest, RenameBatchRequest,
StatsResponse,
SyncAck,
SyncAckItem,
SyncBatchRequest, SyncBatchRequest,
) )
from app.services.obsidian_plugin_indexer import ( from app.services.obsidian_plugin_indexer import (
@ -66,13 +75,15 @@ async def _resolve_vault_connector(
vault_id: str, vault_id: str,
) -> SearchSourceConnector: ) -> SearchSourceConnector:
"""Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user.""" """Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user."""
# ``config`` is core ``JSON`` (not ``JSONB``); ``as_string()`` is the
# cross-dialect equivalent of ``.astext`` and compiles to ``->>``.
stmt = select(SearchSourceConnector).where( stmt = select(SearchSourceConnector).where(
and_( and_(
SearchSourceConnector.user_id == user.id, SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type SearchSourceConnector.connector_type
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR, == SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
SearchSourceConnector.config["vault_id"].astext == vault_id, SearchSourceConnector.config["vault_id"].as_string() == vault_id,
SearchSourceConnector.config["source"].astext == "plugin", SearchSourceConnector.config["source"].as_string() == "plugin",
) )
) )
@ -139,36 +150,15 @@ async def obsidian_connect(
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
) -> ConnectResponse: ) -> ConnectResponse:
"""Register a vault, or return the existing connector row. """Register a vault, or refresh the existing connector row.
Idempotent on (user_id, OBSIDIAN_CONNECTOR, vault_id). Called on every Idempotent on ``(user_id, OBSIDIAN_CONNECTOR, vault_id)`` via the partial
plugin onload as a heartbeat. unique index from migration 129. Called on every plugin onload.
""" """
await _ensure_search_space_access( await _ensure_search_space_access(
session, user=user, search_space_id=payload.search_space_id session, user=user, search_space_id=payload.search_space_id
) )
# FOR UPDATE so concurrent /connect calls for the same vault can't race.
existing: SearchSourceConnector | None = (
(
await session.execute(
select(SearchSourceConnector)
.where(
and_(
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
SearchSourceConnector.config["vault_id"].astext
== payload.vault_id,
)
)
.with_for_update()
)
)
.scalars()
.first()
)
now_iso = datetime.now(UTC).isoformat() now_iso = datetime.now(UTC).isoformat()
cfg = { cfg = {
"vault_id": payload.vault_id, "vault_id": payload.vault_id,
@ -176,50 +166,68 @@ async def obsidian_connect(
"source": "plugin", "source": "plugin",
"last_connect_at": now_iso, "last_connect_at": now_iso,
} }
display_name = f"Obsidian \u2014 {payload.vault_name}"
if existing is not None: # ``index_elements`` + ``index_where`` matches the partial unique index
existing.config = cfg # by shape; ``ON CONFLICT ON CONSTRAINT`` doesn't work for partial indexes.
# Re-stamp on every connect so vault renames in Obsidian propagate; stmt = (
# the web UI hides the Name input for Obsidian connectors. pg_insert(SearchSourceConnector)
existing.name = f"Obsidian — {payload.vault_name}" .values(
existing.is_indexable = False name=display_name,
existing.search_space_id = payload.search_space_id
await session.commit()
await session.refresh(existing)
connector = existing
else:
connector = SearchSourceConnector(
name=f"Obsidian — {payload.vault_name}",
connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR, connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
is_indexable=False, is_indexable=False,
config=cfg, config=cfg,
user_id=user.id, user_id=user.id,
search_space_id=payload.search_space_id, search_space_id=payload.search_space_id,
) )
session.add(connector) .on_conflict_do_update(
await session.commit() index_elements=[
await session.refresh(connector) SearchSourceConnector.user_id,
sa.text("(config->>'vault_id')"),
],
index_where=sa.text(
"connector_type = 'OBSIDIAN_CONNECTOR' "
"AND config->>'source' = 'plugin' "
"AND config->>'vault_id' IS NOT NULL"
),
set_={
"name": display_name,
"config": cfg,
"search_space_id": payload.search_space_id,
"is_indexable": False,
},
)
.returning(SearchSourceConnector)
)
result = await session.execute(stmt)
connector = result.scalar_one()
# Read attrs before commit; ``expire_on_commit=True`` would force a
# lazy refresh that fails with ``MissingGreenlet`` during serialization.
connector_id = connector.id
connector_search_space_id = connector.search_space_id
await session.commit()
return ConnectResponse( return ConnectResponse(
connector_id=connector.id, connector_id=connector_id,
vault_id=payload.vault_id, vault_id=payload.vault_id,
search_space_id=connector.search_space_id, search_space_id=connector_search_space_id,
**_build_handshake(), **_build_handshake(),
) )
@router.post("/sync") @router.post("/sync", response_model=SyncAck)
async def obsidian_sync( async def obsidian_sync(
payload: SyncBatchRequest, payload: SyncBatchRequest,
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
) -> dict[str, object]: ) -> SyncAck:
"""Batch-upsert notes; returns per-note ack so the plugin can dequeue/retry.""" """Batch-upsert notes; returns per-note ack so the plugin can dequeue/retry."""
connector = await _resolve_vault_connector( connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id session, user=user, vault_id=payload.vault_id
) )
results: list[dict[str, object]] = [] items: list[SyncAckItem] = []
indexed = 0 indexed = 0
failed = 0 failed = 0
@ -229,8 +237,8 @@ async def obsidian_sync(
session, connector=connector, payload=note, user_id=str(user.id) session, connector=connector, payload=note, user_id=str(user.id)
) )
indexed += 1 indexed += 1
results.append( items.append(
{"path": note.path, "status": "ok", "document_id": doc.id} SyncAckItem(path=note.path, status="ok", document_id=doc.id)
) )
except HTTPException: except HTTPException:
raise raise
@ -241,30 +249,30 @@ async def obsidian_sync(
note.path, note.path,
payload.vault_id, payload.vault_id,
) )
results.append( items.append(
{"path": note.path, "status": "error", "error": str(exc)[:300]} SyncAckItem(path=note.path, status="error", error=str(exc)[:300])
) )
return { return SyncAck(
"vault_id": payload.vault_id, vault_id=payload.vault_id,
"indexed": indexed, indexed=indexed,
"failed": failed, failed=failed,
"results": results, items=items,
} )
@router.post("/rename") @router.post("/rename", response_model=RenameAck)
async def obsidian_rename( async def obsidian_rename(
payload: RenameBatchRequest, payload: RenameBatchRequest,
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
) -> dict[str, object]: ) -> RenameAck:
"""Apply a batch of vault rename events.""" """Apply a batch of vault rename events."""
connector = await _resolve_vault_connector( connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id session, user=user, vault_id=payload.vault_id
) )
results: list[dict[str, object]] = [] items: list[RenameAckItem] = []
renamed = 0 renamed = 0
missing = 0 missing = 0
@ -279,22 +287,22 @@ async def obsidian_rename(
) )
if doc is None: if doc is None:
missing += 1 missing += 1
results.append( items.append(
{ RenameAckItem(
"old_path": item.old_path, old_path=item.old_path,
"new_path": item.new_path, new_path=item.new_path,
"status": "missing", status="missing",
} )
) )
else: else:
renamed += 1 renamed += 1
results.append( items.append(
{ RenameAckItem(
"old_path": item.old_path, old_path=item.old_path,
"new_path": item.new_path, new_path=item.new_path,
"status": "ok", status="ok",
"document_id": doc.id, document_id=doc.id,
} )
) )
except Exception as exc: except Exception as exc:
logger.exception( logger.exception(
@ -303,29 +311,29 @@ async def obsidian_rename(
item.new_path, item.new_path,
payload.vault_id, payload.vault_id,
) )
results.append( items.append(
{ RenameAckItem(
"old_path": item.old_path, old_path=item.old_path,
"new_path": item.new_path, new_path=item.new_path,
"status": "error", status="error",
"error": str(exc)[:300], error=str(exc)[:300],
} )
) )
return { return RenameAck(
"vault_id": payload.vault_id, vault_id=payload.vault_id,
"renamed": renamed, renamed=renamed,
"missing": missing, missing=missing,
"results": results, items=items,
} )
@router.delete("/notes") @router.delete("/notes", response_model=DeleteAck)
async def obsidian_delete_notes( async def obsidian_delete_notes(
payload: DeleteBatchRequest, payload: DeleteBatchRequest,
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
) -> dict[str, object]: ) -> DeleteAck:
"""Soft-delete a batch of notes by vault-relative path.""" """Soft-delete a batch of notes by vault-relative path."""
connector = await _resolve_vault_connector( connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id session, user=user, vault_id=payload.vault_id
@ -333,7 +341,7 @@ async def obsidian_delete_notes(
deleted = 0 deleted = 0
missing = 0 missing = 0
results: list[dict[str, object]] = [] items: list[DeleteAckItem] = []
for path in payload.paths: for path in payload.paths:
try: try:
ok = await delete_note( ok = await delete_note(
@ -344,26 +352,26 @@ async def obsidian_delete_notes(
) )
if ok: if ok:
deleted += 1 deleted += 1
results.append({"path": path, "status": "ok"}) items.append(DeleteAckItem(path=path, status="ok"))
else: else:
missing += 1 missing += 1
results.append({"path": path, "status": "missing"}) items.append(DeleteAckItem(path=path, status="missing"))
except Exception as exc: except Exception as exc:
logger.exception( logger.exception(
"obsidian DELETE /notes failed for path=%s vault=%s", "obsidian DELETE /notes failed for path=%s vault=%s",
path, path,
payload.vault_id, payload.vault_id,
) )
results.append( items.append(
{"path": path, "status": "error", "error": str(exc)[:300]} DeleteAckItem(path=path, status="error", error=str(exc)[:300])
) )
return { return DeleteAck(
"vault_id": payload.vault_id, vault_id=payload.vault_id,
"deleted": deleted, deleted=deleted,
"missing": missing, missing=missing,
"results": results, items=items,
} )
@router.get("/manifest", response_model=ManifestResponse) @router.get("/manifest", response_model=ManifestResponse)
@ -379,12 +387,12 @@ async def obsidian_manifest(
return await get_manifest(session, connector=connector, vault_id=vault_id) return await get_manifest(session, connector=connector, vault_id=vault_id)
@router.get("/stats") @router.get("/stats", response_model=StatsResponse)
async def obsidian_stats( async def obsidian_stats(
vault_id: str = Query(..., description="Plugin-side stable vault UUID"), vault_id: str = Query(..., description="Plugin-side stable vault UUID"),
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
) -> dict[str, object]: ) -> StatsResponse:
"""Active-note count + last sync time for the web tile. """Active-note count + last sync time for the web tile.
``files_synced`` excludes tombstones so it matches ``/manifest``; ``files_synced`` excludes tombstones so it matches ``/manifest``;
@ -394,7 +402,7 @@ async def obsidian_stats(
session, user=user, vault_id=vault_id session, user=user, vault_id=vault_id
) )
is_active = Document.document_metadata["deleted_at"].astext.is_(None) is_active = Document.document_metadata["deleted_at"].as_string().is_(None)
row = ( row = (
await session.execute( await session.execute(
@ -410,8 +418,8 @@ async def obsidian_stats(
) )
).first() ).first()
return { return StatsResponse(
"vault_id": vault_id, vault_id=vault_id,
"files_synced": int(row[0] or 0), files_synced=int(row[0] or 0),
"last_sync_at": row[1].isoformat() if row[1] else None, last_sync_at=row[1],
} )

View file

@ -8,7 +8,7 @@ prefix (``/api/v2/...``).
from __future__ import annotations from __future__ import annotations
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
@ -109,3 +109,58 @@ class HealthResponse(_PluginBase):
capabilities: list[str] capabilities: list[str]
server_time_utc: datetime server_time_utc: datetime
# Per-item batch ack schemas — wire shape is load-bearing for the plugin
# queue (see api-client.ts / sync-engine.ts:processBatch).
class SyncAckItem(_PluginBase):
path: str
status: Literal["ok", "error"]
document_id: int | None = None
error: str | None = None
class SyncAck(_PluginBase):
vault_id: str
indexed: int
failed: int
items: list[SyncAckItem] = Field(default_factory=list)
class RenameAckItem(_PluginBase):
old_path: str
new_path: str
# ``missing`` is treated as success client-side (end state reached).
status: Literal["ok", "error", "missing"]
document_id: int | None = None
error: str | None = None
class RenameAck(_PluginBase):
vault_id: str
renamed: int
missing: int
items: list[RenameAckItem] = Field(default_factory=list)
class DeleteAckItem(_PluginBase):
path: str
status: Literal["ok", "error", "missing"]
error: str | None = None
class DeleteAck(_PluginBase):
vault_id: str
deleted: int
missing: int
items: list[DeleteAckItem] = Field(default_factory=list)
class StatsResponse(_PluginBase):
"""Backs the Obsidian connector tile in the web UI."""
vault_id: str
files_synced: int
last_sync_at: datetime | None = None

View file

@ -0,0 +1,381 @@
"""Integration tests for the Obsidian plugin HTTP wire contract.
Two concerns:
1. The /connect upsert really collapses concurrent first-time connects to
exactly one row. This locks the partial unique index in migration 129
to its purpose.
2. The end-to-end response shapes returned by /connect /sync /rename
/notes /manifest /stats match the schemas the plugin's TypeScript
decoders expect. Each new field renamed (results -> items, accepted ->
indexed, etc.) is a contract change, and a smoke pass like this is
the cheapest way to catch a future drift before it ships.
"""
from __future__ import annotations
import asyncio
import uuid
from datetime import UTC, datetime
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
from sqlalchemy import func, select, text
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import (
Document,
DocumentType,
SearchSourceConnector,
SearchSourceConnectorType,
SearchSpace,
User,
)
from app.routes.obsidian_plugin_routes import (
obsidian_connect,
obsidian_delete_notes,
obsidian_manifest,
obsidian_rename,
obsidian_stats,
obsidian_sync,
)
from app.schemas.obsidian_plugin import (
ConnectRequest,
DeleteAck,
DeleteBatchRequest,
ManifestResponse,
NotePayload,
RenameAck,
RenameBatchRequest,
RenameItem,
StatsResponse,
SyncAck,
SyncBatchRequest,
)
pytestmark = pytest.mark.integration
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_note_payload(vault_id: str, path: str, content_hash: str) -> NotePayload:
"""Minimal NotePayload that the schema accepts; the indexer is mocked
out so the values don't have to round-trip through the real pipeline."""
now = datetime.now(UTC)
return NotePayload(
vault_id=vault_id,
path=path,
name=path.rsplit("/", 1)[-1].rsplit(".", 1)[0],
extension="md",
content="# Test\n\nbody",
content_hash=content_hash,
mtime=now,
ctime=now,
)
@pytest_asyncio.fixture
async def race_user_and_space(async_engine):
"""User + SearchSpace committed via the live engine so the two
concurrent /connect sessions in the race test can both see them.
We can't use the savepoint-trapped ``db_session`` fixture here
because the concurrent sessions need to see committed rows.
"""
user_id = uuid.uuid4()
async with AsyncSession(async_engine) as setup:
user = User(
id=user_id,
email=f"obsidian-race-{uuid.uuid4()}@surfsense.test",
hashed_password="x",
is_active=True,
is_superuser=False,
is_verified=True,
)
space = SearchSpace(name="Race Space", user_id=user_id)
setup.add_all([user, space])
await setup.commit()
await setup.refresh(space)
space_id = space.id
yield user_id, space_id
async with AsyncSession(async_engine) as cleanup:
# Order matters: connectors -> documents -> space -> user. The
# connectors test creates documents, so we wipe them too. The
# CASCADE on user_id catches anything we missed.
await cleanup.execute(
text(
'DELETE FROM search_source_connectors WHERE user_id = :uid'
),
{"uid": user_id},
)
await cleanup.execute(
text("DELETE FROM searchspaces WHERE id = :id"),
{"id": space_id},
)
await cleanup.execute(
text('DELETE FROM "user" WHERE id = :uid'),
{"uid": user_id},
)
await cleanup.commit()
# ---------------------------------------------------------------------------
# /connect race + index enforcement
# ---------------------------------------------------------------------------
class TestConnectRace:
async def test_concurrent_first_connects_collapse_to_one_row(
self, async_engine, race_user_and_space
):
"""Two simultaneous /connect calls for the same vault_id should
produce exactly one row, not two. This relies on the partial
unique index added in migration 129 plus the
ON CONFLICT DO UPDATE in obsidian_connect."""
user_id, space_id = race_user_and_space
vault_id = str(uuid.uuid4())
async def _call(name_suffix: str) -> None:
async with AsyncSession(async_engine) as s:
fresh_user = await s.get(User, user_id)
payload = ConnectRequest(
vault_id=vault_id,
vault_name=f"My Vault {name_suffix}",
search_space_id=space_id,
)
await obsidian_connect(payload, user=fresh_user, session=s)
results = await asyncio.gather(
_call("a"), _call("b"), return_exceptions=True
)
# Both calls should succeed (ON CONFLICT collapses, doesn't raise).
for r in results:
assert not isinstance(r, Exception), f"Connect raised: {r!r}"
# The fixture creates a fresh user per test, so a count scoped to
# ``user_id`` is equivalent to "rows for this vault" without
# needing the JSON-path filter (which only works against a real
# postgres dialect at compile time, not against a bare model
# expression in a test).
async with AsyncSession(async_engine) as verify:
count = (
await verify.execute(
select(func.count(SearchSourceConnector.id)).where(
SearchSourceConnector.user_id == user_id,
)
)
).scalar_one()
assert count == 1
async def test_partial_unique_index_blocks_raw_duplicate(
self, async_engine, race_user_and_space
):
"""If the partial unique index were missing, two raw INSERTs of
plugin-Obsidian rows for the same (user_id, vault_id) would both
succeed. With the index in place the second one must raise
IntegrityError. This guards the schema regardless of whether the
route logic is correct."""
user_id, space_id = race_user_and_space
vault_id = str(uuid.uuid4())
async with AsyncSession(async_engine) as s:
s.add(
SearchSourceConnector(
name="Obsidian \u2014 First",
connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
is_indexable=False,
config={
"vault_id": vault_id,
"vault_name": "First",
"source": "plugin",
},
user_id=user_id,
search_space_id=space_id,
)
)
await s.commit()
with pytest.raises(IntegrityError):
async with AsyncSession(async_engine) as s:
s.add(
SearchSourceConnector(
name="Obsidian \u2014 Second",
connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
is_indexable=False,
config={
"vault_id": vault_id,
"vault_name": "Second",
"source": "plugin",
},
user_id=user_id,
search_space_id=space_id,
)
)
await s.commit()
# ---------------------------------------------------------------------------
# Combined wire-shape smoke test
# ---------------------------------------------------------------------------
class TestWireContractSmoke:
"""Walks /connect -> /sync -> /rename -> /notes -> /manifest -> /stats
sequentially and asserts each response matches the new schema. With
`response_model=` on every route, FastAPI is already validating the
shape on real traffic; this test mainly guards against accidental
field renames the way the TypeScript decoder would catch them."""
async def test_full_flow_returns_typed_payloads(
self, db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
vault_id = str(uuid.uuid4())
# 1. /connect
connect_resp = await obsidian_connect(
ConnectRequest(
vault_id=vault_id,
vault_name="Smoke Vault",
search_space_id=db_search_space.id,
),
user=db_user,
session=db_session,
)
assert connect_resp.connector_id > 0
assert connect_resp.vault_id == vault_id
assert "sync" in connect_resp.capabilities
# 2. /sync — stub the indexer so the call doesn't drag the LLM /
# embedding pipeline in. We're testing the wire contract, not the
# indexer itself.
fake_doc = type("FakeDoc", (), {"id": 12345})()
with patch(
"app.routes.obsidian_plugin_routes.upsert_note",
new=AsyncMock(return_value=fake_doc),
):
sync_resp = await obsidian_sync(
SyncBatchRequest(
vault_id=vault_id,
notes=[
_make_note_payload(vault_id, "ok.md", "hash-ok"),
_make_note_payload(vault_id, "fail.md", "hash-fail"),
],
),
user=db_user,
session=db_session,
)
assert isinstance(sync_resp, SyncAck)
assert sync_resp.vault_id == vault_id
assert sync_resp.indexed == 2
assert sync_resp.failed == 0
assert len(sync_resp.items) == 2
assert all(it.status == "ok" for it in sync_resp.items)
# The TypeScript decoder filters on items[].status === "error" and
# extracts .path, so confirm both fields are present and named.
assert {it.path for it in sync_resp.items} == {"ok.md", "fail.md"}
# 2b. Re-run /sync but force the indexer to raise on one note so
# the per-item failure decoder gets exercised end-to-end.
async def _selective_upsert(session, *, connector, payload, user_id):
if payload.path == "fail.md":
raise RuntimeError("simulated indexing failure")
return fake_doc
with patch(
"app.routes.obsidian_plugin_routes.upsert_note",
new=AsyncMock(side_effect=_selective_upsert),
):
sync_resp = await obsidian_sync(
SyncBatchRequest(
vault_id=vault_id,
notes=[
_make_note_payload(vault_id, "ok.md", "h1"),
_make_note_payload(vault_id, "fail.md", "h2"),
],
),
user=db_user,
session=db_session,
)
assert sync_resp.indexed == 1
assert sync_resp.failed == 1
statuses = {it.path: it.status for it in sync_resp.items}
assert statuses == {"ok.md": "ok", "fail.md": "error"}
# 3. /rename — patch rename_note so we don't need a real Document.
async def _rename(*args, **kwargs) -> object:
if kwargs.get("old_path") == "missing.md":
return None
return fake_doc
with patch(
"app.routes.obsidian_plugin_routes.rename_note",
new=AsyncMock(side_effect=_rename),
):
rename_resp = await obsidian_rename(
RenameBatchRequest(
vault_id=vault_id,
renames=[
RenameItem(old_path="a.md", new_path="b.md"),
RenameItem(old_path="missing.md", new_path="x.md"),
],
),
user=db_user,
session=db_session,
)
assert isinstance(rename_resp, RenameAck)
assert rename_resp.renamed == 1
assert rename_resp.missing == 1
assert {it.status for it in rename_resp.items} == {"ok", "missing"}
# snake_case fields are deliberate — the plugin decoder maps them
# to camelCase explicitly.
assert all(
it.old_path and it.new_path for it in rename_resp.items
)
# 4. /notes DELETE
async def _delete(*args, **kwargs) -> bool:
return kwargs.get("path") != "ghost.md"
with patch(
"app.routes.obsidian_plugin_routes.delete_note",
new=AsyncMock(side_effect=_delete),
):
delete_resp = await obsidian_delete_notes(
DeleteBatchRequest(vault_id=vault_id, paths=["b.md", "ghost.md"]),
user=db_user,
session=db_session,
)
assert isinstance(delete_resp, DeleteAck)
assert delete_resp.deleted == 1
assert delete_resp.missing == 1
assert {it.path: it.status for it in delete_resp.items} == {
"b.md": "ok",
"ghost.md": "missing",
}
# 5. /manifest — empty (no real Documents were created because
# upsert_note was mocked) but the response shape is what we care
# about.
manifest_resp = await obsidian_manifest(
vault_id=vault_id, user=db_user, session=db_session
)
assert isinstance(manifest_resp, ManifestResponse)
assert manifest_resp.vault_id == vault_id
assert manifest_resp.items == {}
# 6. /stats — same; row count is 0 because upsert_note was mocked.
stats_resp = await obsidian_stats(
vault_id=vault_id, user=db_user, session=db_session
)
assert isinstance(stats_resp, StatsResponse)
assert stats_resp.vault_id == vault_id
assert stats_resp.files_synced == 0
assert stats_resp.last_sync_at is None

View file

@ -1,11 +1,14 @@
import { Notice, requestUrl, type RequestUrlParam, type RequestUrlResponse } from "obsidian"; import { Notice, requestUrl, type RequestUrlParam, type RequestUrlResponse } from "obsidian";
import type { import type {
ConnectResponse, ConnectResponse,
DeleteAck,
HealthResponse, HealthResponse,
ManifestResponse, ManifestResponse,
NotePayload, NotePayload,
RenameAck,
RenameItem, RenameItem,
SearchSpace, SearchSpace,
SyncAck,
} from "./types"; } from "./types";
/** /**
@ -119,26 +122,31 @@ export class SurfSenseApiClient {
); );
} }
/** POST /sync — `failed[]` are paths whose `status === "error"` for retry. */
async syncBatch(input: { async syncBatch(input: {
vaultId: string; vaultId: string;
notes: NotePayload[]; notes: NotePayload[];
}): Promise<{ accepted: number; rejected: string[] }> { }): Promise<{ indexed: number; failed: string[] }> {
const resp = await this.request<{ accepted?: number; rejected?: string[] }>( const resp = await this.request<SyncAck>(
"POST", "POST",
"/api/v1/obsidian/sync", "/api/v1/obsidian/sync",
{ vault_id: input.vaultId, notes: input.notes } { vault_id: input.vaultId, notes: input.notes }
); );
return { const failed = resp.items
accepted: typeof resp.accepted === "number" ? resp.accepted : input.notes.length, .filter((it) => it.status === "error")
rejected: Array.isArray(resp.rejected) ? resp.rejected : [], .map((it) => it.path);
}; return { indexed: resp.indexed, failed };
} }
/** POST /rename — `"missing"` counts as success; only `"error"` is retried. */
async renameBatch(input: { async renameBatch(input: {
vaultId: string; vaultId: string;
renames: Pick<RenameItem, "oldPath" | "newPath">[]; renames: Pick<RenameItem, "oldPath" | "newPath">[];
}): Promise<{ renamed: number }> { }): Promise<{
const resp = await this.request<{ renamed?: number }>( renamed: number;
failed: Array<{ oldPath: string; newPath: string }>;
}> {
const resp = await this.request<RenameAck>(
"POST", "POST",
"/api/v1/obsidian/rename", "/api/v1/obsidian/rename",
{ {
@ -149,19 +157,26 @@ export class SurfSenseApiClient {
})), })),
} }
); );
return { renamed: typeof resp.renamed === "number" ? resp.renamed : 0 }; const failed = resp.items
.filter((it) => it.status === "error")
.map((it) => ({ oldPath: it.old_path, newPath: it.new_path }));
return { renamed: resp.renamed, failed };
} }
/** DELETE /notes — `"missing"` counts as success; only `"error"` is retried. */
async deleteBatch(input: { async deleteBatch(input: {
vaultId: string; vaultId: string;
paths: string[]; paths: string[];
}): Promise<{ deleted: number }> { }): Promise<{ deleted: number; failed: string[] }> {
const resp = await this.request<{ deleted?: number }>( const resp = await this.request<DeleteAck>(
"DELETE", "DELETE",
"/api/v1/obsidian/notes", "/api/v1/obsidian/notes",
{ vault_id: input.vaultId, paths: input.paths } { vault_id: input.vaultId, paths: input.paths }
); );
return { deleted: typeof resp.deleted === "number" ? resp.deleted : 0 }; const failed = resp.items
.filter((it) => it.status === "error")
.map((it) => it.path);
return { deleted: resp.deleted, failed };
} }
async getManifest(vaultId: string): Promise<ManifestResponse> { async getManifest(vaultId: string): Promise<ManifestResponse> {
@ -225,11 +240,16 @@ export class SurfSenseApiClient {
} }
function parseJson<T>(resp: RequestUrlResponse): T { function parseJson<T>(resp: RequestUrlResponse): T {
if (resp.text === undefined || resp.text === "") return undefined as unknown as T; // Plugin endpoints always return JSON; non-JSON 2xx is usually a
// captive portal or CDN page — surface as transient so we back off.
const text = resp.text ?? "";
try { try {
return JSON.parse(resp.text) as T; return JSON.parse(text) as T;
} catch { } catch {
return undefined as unknown as T; throw new TransientError(
resp.status,
`Invalid JSON from server (got: ${text.slice(0, 80)})`
);
} }
} }

View file

@ -246,13 +246,20 @@ export class SyncEngine {
const dropped: QueueItem[] = []; const dropped: QueueItem[] = [];
// Renames first so paths line up server-side before content upserts. // Renames first so paths line up server-side before content upserts.
// Per-item server errors go to retry; "missing" is treated as success.
if (renames.length > 0) { if (renames.length > 0) {
try { try {
await this.deps.apiClient.renameBatch({ const resp = await this.deps.apiClient.renameBatch({
vaultId: settings.vaultId, vaultId: settings.vaultId,
renames: renames.map((r) => ({ oldPath: r.oldPath, newPath: r.newPath })), renames: renames.map((r) => ({ oldPath: r.oldPath, newPath: r.newPath })),
}); });
acked.push(...renames); const failed = new Set(
resp.failed.map((f) => `${f.oldPath}\u0000${f.newPath}`),
);
for (const r of renames) {
if (failed.has(`${r.oldPath}\u0000${r.newPath}`)) retry.push(r);
else acked.push(r);
}
} catch (err) { } catch (err) {
if (await this.handleVaultNotRegistered(err)) { if (await this.handleVaultNotRegistered(err)) {
retry.push(...renames); retry.push(...renames);
@ -267,11 +274,15 @@ export class SyncEngine {
if (deletes.length > 0) { if (deletes.length > 0) {
try { try {
await this.deps.apiClient.deleteBatch({ const resp = await this.deps.apiClient.deleteBatch({
vaultId: settings.vaultId, vaultId: settings.vaultId,
paths: deletes.map((d) => d.path), paths: deletes.map((d) => d.path),
}); });
acked.push(...deletes); const failed = new Set(resp.failed);
for (const d of deletes) {
if (failed.has(d.path)) retry.push(d);
else acked.push(d);
}
} catch (err) { } catch (err) {
if (await this.handleVaultNotRegistered(err)) { if (await this.handleVaultNotRegistered(err)) {
retry.push(...deletes); retry.push(...deletes);
@ -310,10 +321,11 @@ export class SyncEngine {
vaultId: settings.vaultId, vaultId: settings.vaultId,
notes: payloads, notes: payloads,
}); });
const rejected = new Set(resp.rejected ?? []); // Per-note failures retry; the queue's maxAttempts eventually drops poison pills.
const failed = new Set(resp.failed);
for (const item of upserts) { for (const item of upserts) {
if (retry.find((r) => r === item)) continue; if (retry.find((r) => r === item)) continue;
if (rejected.has(item.path)) dropped.push(item); if (failed.has(item.path)) retry.push(item);
else acked.push(item); else acked.push(item);
} }
} catch (err) { } catch (err) {

View file

@ -129,6 +129,49 @@ export interface ManifestResponse {
[key: string]: unknown; [key: string]: unknown;
} }
/** Per-item ack shapes — mirror `app/schemas/obsidian_plugin.py` 1:1. */
export interface SyncAckItem {
path: string;
status: "ok" | "error";
document_id?: number;
error?: string;
}
export interface SyncAck {
vault_id: string;
indexed: number;
failed: number;
items: SyncAckItem[];
}
export interface RenameAckItem {
old_path: string;
new_path: string;
status: "ok" | "error" | "missing";
document_id?: number;
error?: string;
}
export interface RenameAck {
vault_id: string;
renamed: number;
missing: number;
items: RenameAckItem[];
}
export interface DeleteAckItem {
path: string;
status: "ok" | "error" | "missing";
error?: string;
}
export interface DeleteAck {
vault_id: string;
deleted: number;
missing: number;
items: DeleteAckItem[];
}
export type StatusKind = export type StatusKind =
| "idle" | "idle"
| "syncing" | "syncing"