diff --git a/surfsense_backend/alembic/versions/129_deactivate_legacy_obsidian_connectors.py b/surfsense_backend/alembic/versions/129_deactivate_legacy_obsidian_connectors.py new file mode 100644 index 000000000..42808b1ca --- /dev/null +++ b/surfsense_backend/alembic/versions/129_deactivate_legacy_obsidian_connectors.py @@ -0,0 +1,75 @@ +"""129_deactivate_legacy_obsidian_connectors + +Revision ID: 129 +Revises: 128 +Create Date: 2026-04-18 + +Marks every pre-plugin OBSIDIAN_CONNECTOR row as legacy. We keep the +rows (and their indexed Documents) so existing search results don't +suddenly disappear, but we: + +* set ``is_indexable = false`` and ``periodic_indexing_enabled = false`` + so the scheduler will never fire a server-side scan again, +* clear ``next_scheduled_at`` so the scheduler stops considering the + row, +* merge ``{"legacy": true, "deactivated_at": ""}`` into ``config`` + so the new ObsidianConfig view in the web UI can render the + migration banner (and so a future cleanup script can find them). + +A row is "pre-plugin" when its ``config`` does not already have +``source = "plugin"``. The new plugin indexer always writes +``config.source = "plugin"`` on first /obsidian/connect, so this +predicate is stable. +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +revision: str = "129" +down_revision: str | None = "128" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + conn = op.get_bind() + conn.execute( + sa.text( + """ + UPDATE search_source_connectors + SET + is_indexable = false, + periodic_indexing_enabled = false, + next_scheduled_at = NULL, + config = COALESCE(config, '{}'::json)::jsonb + || jsonb_build_object( + 'legacy', true, + 'deactivated_at', to_char( + now() AT TIME ZONE 'UTC', + 'YYYY-MM-DD"T"HH24:MI:SS"Z"' + ) + ) + WHERE connector_type = 'OBSIDIAN_CONNECTOR' + AND COALESCE((config::jsonb)->>'source', '') <> 'plugin' + """ + ) + ) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute( + sa.text( + """ + UPDATE search_source_connectors + SET config = (config::jsonb - 'legacy' - 'deactivated_at')::json + WHERE connector_type = 'OBSIDIAN_CONNECTOR' + AND (config::jsonb) ? 'legacy' + """ + ) + ) diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index ad40666cd..070060878 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -37,6 +37,7 @@ from .new_llm_config_routes import router as new_llm_config_router from .notes_routes import router as notes_router from .notifications_routes import router as notifications_router from .notion_add_connector_route import router as notion_add_connector_router +from .obsidian_plugin_routes import router as obsidian_plugin_router from .onedrive_add_connector_route import router as onedrive_add_connector_router from .podcasts_routes import router as podcasts_router from .prompts_routes import router as prompts_router @@ -84,6 +85,7 @@ router.include_router(notion_add_connector_router) router.include_router(slack_add_connector_router) router.include_router(teams_add_connector_router) router.include_router(onedrive_add_connector_router) +router.include_router(obsidian_plugin_router) # Obsidian plugin push API router.include_router(discord_add_connector_router) router.include_router(jira_add_connector_router) router.include_router(confluence_add_connector_router) diff --git a/surfsense_backend/app/routes/obsidian_plugin_routes.py b/surfsense_backend/app/routes/obsidian_plugin_routes.py new file mode 100644 index 000000000..c7656332d --- /dev/null +++ b/surfsense_backend/app/routes/obsidian_plugin_routes.py @@ -0,0 +1,450 @@ +""" +Obsidian plugin ingestion routes. + +This is the public surface that the SurfSense Obsidian plugin +(``surfsense_obsidian/``) speaks to. It is a separate router from the +legacy server-path Obsidian connector — the legacy code stays in place +until the ``obsidian-legacy-cleanup`` plan ships. + +Endpoints +--------- + +- ``GET /api/v1/obsidian/health`` — version handshake +- ``POST /api/v1/obsidian/connect`` — register or get a vault row +- ``POST /api/v1/obsidian/sync`` — batch upsert +- ``POST /api/v1/obsidian/rename`` — batch rename +- ``DELETE /api/v1/obsidian/notes`` — batch soft-delete +- ``GET /api/v1/obsidian/manifest`` — reconcile manifest + +Auth contract +------------- + +Every endpoint requires ``Depends(current_active_user)`` — the same JWT +bearer the rest of the API uses; future PAT migration is transparent. + +API stability is provided by the ``/api/v1/...`` URL prefix and the +``capabilities`` array advertised on ``/health`` (additive only). There +is no plugin-version gate; "your plugin is out of date" notices are +delegated to Obsidian's built-in community-store updater. +""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import and_ +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + SearchSpace, + User, + get_async_session, +) +from app.schemas.obsidian_plugin import ( + ConnectRequest, + ConnectResponse, + DeleteBatchRequest, + HealthResponse, + ManifestResponse, + RenameBatchRequest, + SyncBatchRequest, +) +from app.services.obsidian_plugin_indexer import ( + delete_note, + get_manifest, + rename_note, + upsert_note, +) +from app.users import current_active_user + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/obsidian", tags=["obsidian-plugin"]) + + +# Bumped manually whenever the wire contract gains a non-additive change. +# Additive (extra='ignore'-safe) changes do NOT bump this. +OBSIDIAN_API_VERSION = "1" + +# Capabilities advertised on /health and /connect. Plugins use this list +# for feature gating ("does this server understand attachments_v2?"). Add +# new strings, never rename/remove existing ones — older plugins ignore +# unknown entries safely. +OBSIDIAN_CAPABILITIES: list[str] = ["sync", "rename", "delete", "manifest"] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _build_handshake() -> dict[str, object]: + return { + "api_version": OBSIDIAN_API_VERSION, + "capabilities": list(OBSIDIAN_CAPABILITIES), + } + + +async def _resolve_vault_connector( + session: AsyncSession, + *, + user: User, + vault_id: str, +) -> SearchSourceConnector: + """Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user. + + Looked up by the (user_id, connector_type, config['vault_id']) tuple + so users can have multiple vaults each backed by its own connector + row (one per search space). + """ + result = await session.execute( + select(SearchSourceConnector).where( + and_( + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.OBSIDIAN_CONNECTOR, + ) + ) + ) + candidates = result.scalars().all() + for connector in candidates: + cfg = connector.config or {} + if cfg.get("vault_id") == vault_id and cfg.get("source") == "plugin": + return connector + + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": "VAULT_NOT_REGISTERED", + "message": ( + "No Obsidian plugin connector found for this vault. " + "Call POST /obsidian/connect first." + ), + "vault_id": vault_id, + }, + ) + + +async def _ensure_search_space_access( + session: AsyncSession, + *, + user: User, + search_space_id: int, +) -> SearchSpace: + """Confirm the user owns the requested search space. + + Plugin currently does not support shared search spaces (RBAC roles) + — that's a follow-up. Restricting to owner-only here keeps the + surface narrow and avoids leaking other members' connectors. + """ + result = await session.execute( + select(SearchSpace).where( + and_(SearchSpace.id == search_space_id, SearchSpace.user_id == user.id) + ) + ) + space = result.scalars().first() + if space is None: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={ + "code": "SEARCH_SPACE_FORBIDDEN", + "message": "You don't own that search space.", + }, + ) + return space + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + + +@router.get("/health", response_model=HealthResponse) +async def obsidian_health( + user: User = Depends(current_active_user), +) -> HealthResponse: + """Return the API contract handshake. + + The plugin calls this once per ``onload`` and caches the result for + capability-gating decisions. + """ + return HealthResponse( + **_build_handshake(), + server_time_utc=datetime.now(UTC), + ) + + +@router.post("/connect", response_model=ConnectResponse) +async def obsidian_connect( + payload: ConnectRequest, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> ConnectResponse: + """Register a vault, or return the existing connector row. + + Idempotent on the (user_id, OBSIDIAN_CONNECTOR, vault_id) tuple so + re-installing the plugin or reconnecting from a new device picks up + the same connector — and therefore the same documents. + """ + await _ensure_search_space_access( + session, user=user, search_space_id=payload.search_space_id + ) + + result = await session.execute( + select(SearchSourceConnector).where( + and_( + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.OBSIDIAN_CONNECTOR, + ) + ) + ) + existing: SearchSourceConnector | None = None + for candidate in result.scalars().all(): + cfg = candidate.config or {} + if cfg.get("vault_id") == payload.vault_id: + existing = candidate + break + + now_iso = datetime.now(UTC).isoformat() + + if existing is not None: + cfg = dict(existing.config or {}) + cfg.update( + { + "vault_id": payload.vault_id, + "vault_name": payload.vault_name, + "source": "plugin", + "plugin_version": payload.plugin_version, + "device_id": payload.device_id, + "last_connect_at": now_iso, + } + ) + if payload.device_label: + cfg["device_label"] = payload.device_label + cfg.pop("legacy", None) + cfg.pop("vault_path", None) + existing.config = cfg + existing.is_indexable = False + existing.search_space_id = payload.search_space_id + await session.commit() + await session.refresh(existing) + connector = existing + else: + connector = SearchSourceConnector( + name=f"Obsidian — {payload.vault_name}", + connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR, + is_indexable=False, + config={ + "vault_id": payload.vault_id, + "vault_name": payload.vault_name, + "source": "plugin", + "plugin_version": payload.plugin_version, + "device_id": payload.device_id, + "device_label": payload.device_label, + "files_synced": 0, + "last_connect_at": now_iso, + }, + user_id=user.id, + search_space_id=payload.search_space_id, + ) + session.add(connector) + await session.commit() + await session.refresh(connector) + + return ConnectResponse( + connector_id=connector.id, + vault_id=payload.vault_id, + search_space_id=connector.search_space_id, + **_build_handshake(), + ) + + +@router.post("/sync") +async def obsidian_sync( + payload: SyncBatchRequest, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> dict[str, object]: + """Batch-upsert notes pushed by the plugin. + + Returns per-note ack so the plugin can dequeue successes and retry + failures. + """ + connector = await _resolve_vault_connector( + session, user=user, vault_id=payload.vault_id + ) + + results: list[dict[str, object]] = [] + indexed = 0 + failed = 0 + + for note in payload.notes: + try: + doc = await upsert_note( + session, connector=connector, payload=note, user_id=str(user.id) + ) + indexed += 1 + results.append( + {"path": note.path, "status": "ok", "document_id": doc.id} + ) + except HTTPException: + raise + except Exception as exc: + failed += 1 + logger.exception( + "obsidian /sync failed for path=%s vault=%s", + note.path, + payload.vault_id, + ) + results.append( + {"path": note.path, "status": "error", "error": str(exc)[:300]} + ) + + cfg = dict(connector.config or {}) + cfg["last_sync_at"] = datetime.now(UTC).isoformat() + cfg["files_synced"] = int(cfg.get("files_synced", 0)) + indexed + connector.config = cfg + await session.commit() + + return { + "vault_id": payload.vault_id, + "indexed": indexed, + "failed": failed, + "results": results, + } + + +@router.post("/rename") +async def obsidian_rename( + payload: RenameBatchRequest, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> dict[str, object]: + """Apply a batch of vault rename events.""" + connector = await _resolve_vault_connector( + session, user=user, vault_id=payload.vault_id + ) + + results: list[dict[str, object]] = [] + renamed = 0 + missing = 0 + + for item in payload.renames: + try: + doc = await rename_note( + session, + connector=connector, + old_path=item.old_path, + new_path=item.new_path, + vault_id=payload.vault_id, + ) + if doc is None: + missing += 1 + results.append( + { + "old_path": item.old_path, + "new_path": item.new_path, + "status": "missing", + } + ) + else: + renamed += 1 + results.append( + { + "old_path": item.old_path, + "new_path": item.new_path, + "status": "ok", + "document_id": doc.id, + } + ) + except Exception as exc: + logger.exception( + "obsidian /rename failed for old=%s new=%s vault=%s", + item.old_path, + item.new_path, + payload.vault_id, + ) + results.append( + { + "old_path": item.old_path, + "new_path": item.new_path, + "status": "error", + "error": str(exc)[:300], + } + ) + + return { + "vault_id": payload.vault_id, + "renamed": renamed, + "missing": missing, + "results": results, + } + + +@router.delete("/notes") +async def obsidian_delete_notes( + payload: DeleteBatchRequest, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> dict[str, object]: + """Soft-delete a batch of notes by vault-relative path.""" + connector = await _resolve_vault_connector( + session, user=user, vault_id=payload.vault_id + ) + + deleted = 0 + missing = 0 + results: list[dict[str, object]] = [] + for path in payload.paths: + try: + ok = await delete_note( + session, + connector=connector, + vault_id=payload.vault_id, + path=path, + ) + if ok: + deleted += 1 + results.append({"path": path, "status": "ok"}) + else: + missing += 1 + results.append({"path": path, "status": "missing"}) + except Exception as exc: + logger.exception( + "obsidian DELETE /notes failed for path=%s vault=%s", + path, + payload.vault_id, + ) + results.append( + {"path": path, "status": "error", "error": str(exc)[:300]} + ) + + return { + "vault_id": payload.vault_id, + "deleted": deleted, + "missing": missing, + "results": results, + } + + +@router.get("/manifest", response_model=ManifestResponse) +async def obsidian_manifest( + vault_id: str = Query(..., description="Plugin-side stable vault UUID"), + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> ManifestResponse: + """Return the server-side ``{path: {hash, mtime}}`` manifest. + + Used by the plugin's ``onload`` reconcile to find files that were + edited or deleted while the plugin was offline. + """ + connector = await _resolve_vault_connector( + session, user=user, vault_id=vault_id + ) + return await get_manifest(session, connector=connector, vault_id=vault_id) diff --git a/surfsense_backend/app/schemas/obsidian_plugin.py b/surfsense_backend/app/schemas/obsidian_plugin.py new file mode 100644 index 000000000..c4c3cd8d4 --- /dev/null +++ b/surfsense_backend/app/schemas/obsidian_plugin.py @@ -0,0 +1,147 @@ +""" +Obsidian Plugin connector schemas. + +Wire format spoken between the SurfSense Obsidian plugin +(``surfsense_obsidian/``) and the FastAPI backend. + +Stability contract +------------------ +Every request and response schema sets ``model_config = ConfigDict(extra='ignore')``. +This is the API stability contract — not just hygiene: + +- Old plugins talking to a newer backend silently drop any new response fields + they don't understand instead of failing validation. +- New plugins talking to an older backend can include forward-looking request + fields (e.g. attachments metadata) without the older backend rejecting them. + +Hard breaking changes are reserved for the URL prefix (``/api/v2/...``). +Additive evolution is signaled via the ``capabilities`` array on +``HealthResponse`` / ``ConnectResponse`` — older plugins ignore unknown +capability strings safely. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + +_PLUGIN_MODEL_CONFIG = ConfigDict(extra="ignore") + + +class _PluginBase(BaseModel): + """Base class for all plugin payload schemas. + + Carries the forward-compatibility config so subclasses don't have to + repeat it. + """ + + model_config = _PLUGIN_MODEL_CONFIG + + +class NotePayload(_PluginBase): + """One Obsidian note as pushed by the plugin. + + The plugin is the source of truth: ``content`` is the post-frontmatter + body, ``frontmatter``/``tags``/``headings``/etc. are precomputed by the + plugin via ``app.metadataCache`` so the backend doesn't have to re-parse. + """ + + vault_id: str = Field(..., description="Stable plugin-generated UUID for this vault") + path: str = Field(..., description="Vault-relative path, e.g. 'notes/foo.md'") + name: str = Field(..., description="File stem (no extension)") + extension: str = Field(default="md", description="File extension without leading dot") + content: str = Field(default="", description="Raw markdown body (post-frontmatter)") + + frontmatter: dict[str, Any] = Field(default_factory=dict) + tags: list[str] = Field(default_factory=list) + headings: list[str] = Field(default_factory=list) + resolved_links: list[str] = Field(default_factory=list) + unresolved_links: list[str] = Field(default_factory=list) + embeds: list[str] = Field(default_factory=list) + aliases: list[str] = Field(default_factory=list) + + content_hash: str = Field(..., description="Plugin-computed SHA-256 of the raw content") + mtime: datetime + ctime: datetime + + +class SyncBatchRequest(_PluginBase): + """Batch upsert. Plugin sends 10-20 notes per request to amortize HTTP overhead.""" + + vault_id: str + notes: list[NotePayload] = Field(default_factory=list, max_length=100) + + +class RenameItem(_PluginBase): + old_path: str + new_path: str + + +class RenameBatchRequest(_PluginBase): + vault_id: str + renames: list[RenameItem] = Field(default_factory=list, max_length=200) + + +class DeleteBatchRequest(_PluginBase): + vault_id: str + paths: list[str] = Field(default_factory=list, max_length=500) + + +class ManifestEntry(_PluginBase): + """One row of the server-side manifest used by the plugin to reconcile.""" + + hash: str + mtime: datetime + + +class ManifestResponse(_PluginBase): + """Path-keyed manifest of every non-deleted note for a vault.""" + + vault_id: str + items: dict[str, ManifestEntry] = Field(default_factory=dict) + + +class ConnectRequest(_PluginBase): + """First-call handshake to register or look up a vault connector row.""" + + vault_id: str + vault_name: str + search_space_id: int + plugin_version: str + device_id: str + device_label: str | None = Field( + default=None, + description="User-friendly device name shown in the web UI (e.g. 'iPad Pro').", + ) + + +class ConnectResponse(_PluginBase): + """Returned from POST /connect. + + Carries the same handshake fields as ``HealthResponse`` so the plugin + learns the contract on its very first call without an extra round-trip + to ``GET /health``. + """ + + connector_id: int + vault_id: str + search_space_id: int + api_version: str + capabilities: list[str] + + +class HealthResponse(_PluginBase): + """API contract handshake. + + The plugin calls ``GET /health`` once per ``onload`` and caches the + result. ``capabilities`` is a forward-extensible string list: future + additions (``'pat_auth'``, ``'scoped_pat'``, ``'attachments_v2'``, + ``'shared_search_spaces'``...) ship without breaking older plugins + because they only enable extra behavior, never gate existing endpoints. + """ + + api_version: str + capabilities: list[str] + server_time_utc: datetime diff --git a/surfsense_backend/app/services/obsidian_plugin_indexer.py b/surfsense_backend/app/services/obsidian_plugin_indexer.py new file mode 100644 index 000000000..385c8e013 --- /dev/null +++ b/surfsense_backend/app/services/obsidian_plugin_indexer.py @@ -0,0 +1,400 @@ +""" +Obsidian plugin indexer service. + +Bridges the SurfSense Obsidian plugin's HTTP payloads +(see ``app/schemas/obsidian_plugin.py``) into the shared +``IndexingPipelineService``. + +Responsibilities: + +- ``upsert_note`` — push one note through the indexing pipeline; respects + unchanged content (skip) and version-snapshots existing rows before + rewrite. +- ``rename_note`` — rewrite path-derived fields (path metadata, + ``unique_identifier_hash``, ``source_url``) without re-indexing content. +- ``delete_note`` — soft delete with a tombstone in ``document_metadata`` + so reconciliation can distinguish "user explicitly killed this in the UI" + from "plugin hasn't synced yet". +- ``get_manifest`` — return ``{path: {hash, mtime}}`` for every non-deleted + note belonging to a vault, used by the plugin's reconcile pass on + ``onload``. + +Design notes +------------ + +The plugin's content hash and the backend's ``content_hash`` are computed +differently (plugin uses raw SHA-256 of the markdown body; backend salts +with ``search_space_id``). We persist the plugin's hash in +``document_metadata['plugin_content_hash']`` so the manifest endpoint can +return what the plugin sent — that's the only number the plugin can +compare without re-downloading content. +""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime +from typing import Any +from urllib.parse import quote + +from sqlalchemy import and_, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import ( + Document, + DocumentStatus, + DocumentType, + SearchSourceConnector, +) +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService +from app.schemas.obsidian_plugin import ( + ManifestEntry, + ManifestResponse, + NotePayload, +) +from app.services.llm_service import get_user_long_context_llm +from app.utils.document_converters import generate_unique_identifier_hash +from app.utils.document_versioning import create_version_snapshot + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _vault_path_unique_id(vault_id: str, path: str) -> str: + """Stable identifier for a note. Vault-scoped so the same path under two + different vaults doesn't collide.""" + return f"{vault_id}:{path}" + + +def _build_source_url(vault_name: str, path: str) -> str: + """Build the ``obsidian://`` deep link for the web UI's "Open in Obsidian" + button. Both segments are URL-encoded because vault names and paths can + contain spaces, ``#``, ``?``, etc. + """ + return ( + "obsidian://open" + f"?vault={quote(vault_name, safe='')}" + f"&file={quote(path, safe='')}" + ) + + +def _build_metadata( + payload: NotePayload, + *, + vault_name: str, + connector_id: int, + extra: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Flatten the rich plugin payload into the JSONB ``document_metadata`` + column. Keys here are what the chat UI / search UI surface to users. + """ + meta: dict[str, Any] = { + "source": "plugin", + "vault_id": payload.vault_id, + "vault_name": vault_name, + "file_path": payload.path, + "file_name": payload.name, + "extension": payload.extension, + "frontmatter": payload.frontmatter, + "tags": payload.tags, + "headings": payload.headings, + "outgoing_links": payload.resolved_links, + "unresolved_links": payload.unresolved_links, + "embeds": payload.embeds, + "aliases": payload.aliases, + "plugin_content_hash": payload.content_hash, + "mtime": payload.mtime.isoformat(), + "ctime": payload.ctime.isoformat(), + "connector_id": connector_id, + "url": _build_source_url(vault_name, payload.path), + } + if extra: + meta.update(extra) + return meta + + +def _build_document_string(payload: NotePayload, vault_name: str) -> str: + """Compose the indexable string the pipeline embeds and chunks. + + Mirrors the legacy obsidian indexer's METADATA + CONTENT framing so + existing search relevance heuristics keep working unchanged. + """ + tags_line = ", ".join(payload.tags) if payload.tags else "None" + links_line = ( + ", ".join(payload.resolved_links) if payload.resolved_links else "None" + ) + return ( + "\n" + f"Title: {payload.name}\n" + f"Vault: {vault_name}\n" + f"Path: {payload.path}\n" + f"Tags: {tags_line}\n" + f"Links to: {links_line}\n" + "\n\n" + "\n" + f"{payload.content}\n" + "\n" + ) + + +async def _find_existing_document( + session: AsyncSession, + *, + search_space_id: int, + vault_id: str, + path: str, +) -> Document | None: + unique_id = _vault_path_unique_id(vault_id, path) + uid_hash = generate_unique_identifier_hash( + DocumentType.OBSIDIAN_CONNECTOR, + unique_id, + search_space_id, + ) + result = await session.execute( + select(Document).where(Document.unique_identifier_hash == uid_hash) + ) + return result.scalars().first() + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +async def upsert_note( + session: AsyncSession, + *, + connector: SearchSourceConnector, + payload: NotePayload, + user_id: str, +) -> Document: + """Index or refresh a single note pushed by the plugin. + + Returns the resulting ``Document`` (whether newly created, updated, or + a skip-because-unchanged hit). + """ + vault_name: str = (connector.config or {}).get("vault_name") or "Vault" + search_space_id = connector.search_space_id + + existing = await _find_existing_document( + session, + search_space_id=search_space_id, + vault_id=payload.vault_id, + path=payload.path, + ) + + plugin_hash = payload.content_hash + if existing is not None: + existing_meta = existing.document_metadata or {} + was_tombstoned = bool(existing_meta.get("deleted_at")) + + if ( + not was_tombstoned + and existing_meta.get("plugin_content_hash") == plugin_hash + and DocumentStatus.is_state(existing.status, DocumentStatus.READY) + ): + return existing + + try: + await create_version_snapshot(session, existing) + except Exception: + logger.debug( + "version snapshot failed for obsidian doc %s", + existing.id, + exc_info=True, + ) + + document_string = _build_document_string(payload, vault_name) + metadata = _build_metadata( + payload, + vault_name=vault_name, + connector_id=connector.id, + ) + + connector_doc = ConnectorDocument( + title=payload.name, + source_markdown=document_string, + unique_id=_vault_path_unique_id(payload.vault_id, payload.path), + document_type=DocumentType.OBSIDIAN_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector.id, + created_by_id=str(user_id), + should_summarize=connector.enable_summary, + fallback_summary=f"Obsidian Note: {payload.name}\n\n{payload.content}", + metadata=metadata, + ) + + pipeline = IndexingPipelineService(session) + prepared = await pipeline.prepare_for_indexing([connector_doc]) + if not prepared: + if existing is not None: + return existing + raise RuntimeError( + f"Indexing pipeline rejected obsidian note {payload.path}" + ) + + document = prepared[0] + + llm = await get_user_long_context_llm(session, str(user_id), search_space_id) + return await pipeline.index(document, connector_doc, llm) + + +async def rename_note( + session: AsyncSession, + *, + connector: SearchSourceConnector, + old_path: str, + new_path: str, + vault_id: str, +) -> Document | None: + """Rewrite path-derived columns without re-indexing content. + + Returns the updated document, or ``None`` if no row matched the + ``old_path`` (this happens when the plugin is renaming a file that was + never synced — safe to ignore, the next ``sync`` will create it under + the new path). + """ + vault_name: str = (connector.config or {}).get("vault_name") or "Vault" + search_space_id = connector.search_space_id + + existing = await _find_existing_document( + session, + search_space_id=search_space_id, + vault_id=vault_id, + path=old_path, + ) + if existing is None: + return None + + new_unique_id = _vault_path_unique_id(vault_id, new_path) + new_uid_hash = generate_unique_identifier_hash( + DocumentType.OBSIDIAN_CONNECTOR, + new_unique_id, + search_space_id, + ) + + collision = await session.execute( + select(Document).where( + and_( + Document.unique_identifier_hash == new_uid_hash, + Document.id != existing.id, + ) + ) + ) + collision_row = collision.scalars().first() + if collision_row is not None: + logger.warning( + "obsidian rename target already exists " + "(vault=%s old=%s new=%s); skipping rename so the next /sync " + "can resolve the conflict via content_hash", + vault_id, + old_path, + new_path, + ) + return existing + + new_filename = new_path.rsplit("/", 1)[-1] + new_stem = new_filename.rsplit(".", 1)[0] if "." in new_filename else new_filename + + existing.unique_identifier_hash = new_uid_hash + existing.title = new_stem + + meta = dict(existing.document_metadata or {}) + meta["file_path"] = new_path + meta["file_name"] = new_stem + meta["url"] = _build_source_url(vault_name, new_path) + existing.document_metadata = meta + existing.updated_at = datetime.now(UTC) + + await session.commit() + return existing + + +async def delete_note( + session: AsyncSession, + *, + connector: SearchSourceConnector, + vault_id: str, + path: str, +) -> bool: + """Soft-delete via tombstone in ``document_metadata``. + + The row is *not* removed and chunks are *not* dropped, so existing + citations in chat threads remain resolvable. The manifest endpoint + filters tombstoned rows out, so the plugin's reconcile pass will not + see this path and won't try to "resurrect" a note the user deleted in + the SurfSense UI. + + Returns True if a row was tombstoned, False if no matching row existed. + """ + existing = await _find_existing_document( + session, + search_space_id=connector.search_space_id, + vault_id=vault_id, + path=path, + ) + if existing is None: + return False + + meta = dict(existing.document_metadata or {}) + if meta.get("deleted_at"): + return True + + meta["deleted_at"] = datetime.now(UTC).isoformat() + meta["deleted_by_source"] = "plugin" + existing.document_metadata = meta + existing.updated_at = datetime.now(UTC) + + await session.commit() + return True + + +async def get_manifest( + session: AsyncSession, + *, + connector: SearchSourceConnector, + vault_id: str, +) -> ManifestResponse: + """Return ``{path: {hash, mtime}}`` for every non-deleted note in this + vault. + + The plugin compares this against its local vault on every ``onload`` to + catch up edits made while offline. Rows missing ``plugin_content_hash`` + (e.g. tombstoned, or somehow indexed without going through this + service) are excluded so the plugin doesn't get confused by partial + data. + """ + result = await session.execute( + select(Document).where( + and_( + Document.search_space_id == connector.search_space_id, + Document.connector_id == connector.id, + Document.document_type == DocumentType.OBSIDIAN_CONNECTOR, + ) + ) + ) + + items: dict[str, ManifestEntry] = {} + for doc in result.scalars().all(): + meta = doc.document_metadata or {} + if meta.get("deleted_at"): + continue + if meta.get("vault_id") != vault_id: + continue + path = meta.get("file_path") + plugin_hash = meta.get("plugin_content_hash") + mtime_raw = meta.get("mtime") + if not path or not plugin_hash or not mtime_raw: + continue + try: + mtime = datetime.fromisoformat(mtime_raw) + except ValueError: + continue + items[path] = ManifestEntry(hash=plugin_hash, mtime=mtime) + + return ManifestResponse(vault_id=vault_id, items=items)