feat: implement Obsidian plugin ingestion routes and indexing service

This commit is contained in:
Anish Sarkar 2026-04-20 04:03:19 +05:30
parent f903bcc80d
commit e8fc1069bc
5 changed files with 1074 additions and 0 deletions

View file

@ -0,0 +1,75 @@
"""129_deactivate_legacy_obsidian_connectors
Revision ID: 129
Revises: 128
Create Date: 2026-04-18
Marks every pre-plugin OBSIDIAN_CONNECTOR row as legacy. We keep the
rows (and their indexed Documents) so existing search results don't
suddenly disappear, but we:
* set ``is_indexable = false`` and ``periodic_indexing_enabled = false``
so the scheduler will never fire a server-side scan again,
* clear ``next_scheduled_at`` so the scheduler stops considering the
row,
* merge ``{"legacy": true, "deactivated_at": "<now>"}`` into ``config``
so the new ObsidianConfig view in the web UI can render the
migration banner (and so a future cleanup script can find them).
A row is "pre-plugin" when its ``config`` does not already have
``source = "plugin"``. The new plugin indexer always writes
``config.source = "plugin"`` on first /obsidian/connect, so this
predicate is stable.
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "129"
down_revision: str | None = "128"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
conn = op.get_bind()
conn.execute(
sa.text(
"""
UPDATE search_source_connectors
SET
is_indexable = false,
periodic_indexing_enabled = false,
next_scheduled_at = NULL,
config = COALESCE(config, '{}'::json)::jsonb
|| jsonb_build_object(
'legacy', true,
'deactivated_at', to_char(
now() AT TIME ZONE 'UTC',
'YYYY-MM-DD"T"HH24:MI:SS"Z"'
)
)
WHERE connector_type = 'OBSIDIAN_CONNECTOR'
AND COALESCE((config::jsonb)->>'source', '') <> 'plugin'
"""
)
)
def downgrade() -> None:
conn = op.get_bind()
conn.execute(
sa.text(
"""
UPDATE search_source_connectors
SET config = (config::jsonb - 'legacy' - 'deactivated_at')::json
WHERE connector_type = 'OBSIDIAN_CONNECTOR'
AND (config::jsonb) ? 'legacy'
"""
)
)

View file

@ -37,6 +37,7 @@ from .new_llm_config_routes import router as new_llm_config_router
from .notes_routes import router as notes_router
from .notifications_routes import router as notifications_router
from .notion_add_connector_route import router as notion_add_connector_router
from .obsidian_plugin_routes import router as obsidian_plugin_router
from .onedrive_add_connector_route import router as onedrive_add_connector_router
from .podcasts_routes import router as podcasts_router
from .prompts_routes import router as prompts_router
@ -84,6 +85,7 @@ router.include_router(notion_add_connector_router)
router.include_router(slack_add_connector_router)
router.include_router(teams_add_connector_router)
router.include_router(onedrive_add_connector_router)
router.include_router(obsidian_plugin_router) # Obsidian plugin push API
router.include_router(discord_add_connector_router)
router.include_router(jira_add_connector_router)
router.include_router(confluence_add_connector_router)

View file

@ -0,0 +1,450 @@
"""
Obsidian plugin ingestion routes.
This is the public surface that the SurfSense Obsidian plugin
(``surfsense_obsidian/``) speaks to. It is a separate router from the
legacy server-path Obsidian connector the legacy code stays in place
until the ``obsidian-legacy-cleanup`` plan ships.
Endpoints
---------
- ``GET /api/v1/obsidian/health`` version handshake
- ``POST /api/v1/obsidian/connect`` register or get a vault row
- ``POST /api/v1/obsidian/sync`` batch upsert
- ``POST /api/v1/obsidian/rename`` batch rename
- ``DELETE /api/v1/obsidian/notes`` batch soft-delete
- ``GET /api/v1/obsidian/manifest`` reconcile manifest
Auth contract
-------------
Every endpoint requires ``Depends(current_active_user)`` the same JWT
bearer the rest of the API uses; future PAT migration is transparent.
API stability is provided by the ``/api/v1/...`` URL prefix and the
``capabilities`` array advertised on ``/health`` (additive only). There
is no plugin-version gate; "your plugin is out of date" notices are
delegated to Obsidian's built-in community-store updater.
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
SearchSpace,
User,
get_async_session,
)
from app.schemas.obsidian_plugin import (
ConnectRequest,
ConnectResponse,
DeleteBatchRequest,
HealthResponse,
ManifestResponse,
RenameBatchRequest,
SyncBatchRequest,
)
from app.services.obsidian_plugin_indexer import (
delete_note,
get_manifest,
rename_note,
upsert_note,
)
from app.users import current_active_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/obsidian", tags=["obsidian-plugin"])
# Bumped manually whenever the wire contract gains a non-additive change.
# Additive (extra='ignore'-safe) changes do NOT bump this.
OBSIDIAN_API_VERSION = "1"
# Capabilities advertised on /health and /connect. Plugins use this list
# for feature gating ("does this server understand attachments_v2?"). Add
# new strings, never rename/remove existing ones — older plugins ignore
# unknown entries safely.
OBSIDIAN_CAPABILITIES: list[str] = ["sync", "rename", "delete", "manifest"]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _build_handshake() -> dict[str, object]:
return {
"api_version": OBSIDIAN_API_VERSION,
"capabilities": list(OBSIDIAN_CAPABILITIES),
}
async def _resolve_vault_connector(
session: AsyncSession,
*,
user: User,
vault_id: str,
) -> SearchSourceConnector:
"""Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user.
Looked up by the (user_id, connector_type, config['vault_id']) tuple
so users can have multiple vaults each backed by its own connector
row (one per search space).
"""
result = await session.execute(
select(SearchSourceConnector).where(
and_(
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
)
)
)
candidates = result.scalars().all()
for connector in candidates:
cfg = connector.config or {}
if cfg.get("vault_id") == vault_id and cfg.get("source") == "plugin":
return connector
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"code": "VAULT_NOT_REGISTERED",
"message": (
"No Obsidian plugin connector found for this vault. "
"Call POST /obsidian/connect first."
),
"vault_id": vault_id,
},
)
async def _ensure_search_space_access(
session: AsyncSession,
*,
user: User,
search_space_id: int,
) -> SearchSpace:
"""Confirm the user owns the requested search space.
Plugin currently does not support shared search spaces (RBAC roles)
that's a follow-up. Restricting to owner-only here keeps the
surface narrow and avoids leaking other members' connectors.
"""
result = await session.execute(
select(SearchSpace).where(
and_(SearchSpace.id == search_space_id, SearchSpace.user_id == user.id)
)
)
space = result.scalars().first()
if space is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"code": "SEARCH_SPACE_FORBIDDEN",
"message": "You don't own that search space.",
},
)
return space
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("/health", response_model=HealthResponse)
async def obsidian_health(
user: User = Depends(current_active_user),
) -> HealthResponse:
"""Return the API contract handshake.
The plugin calls this once per ``onload`` and caches the result for
capability-gating decisions.
"""
return HealthResponse(
**_build_handshake(),
server_time_utc=datetime.now(UTC),
)
@router.post("/connect", response_model=ConnectResponse)
async def obsidian_connect(
payload: ConnectRequest,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> ConnectResponse:
"""Register a vault, or return the existing connector row.
Idempotent on the (user_id, OBSIDIAN_CONNECTOR, vault_id) tuple so
re-installing the plugin or reconnecting from a new device picks up
the same connector and therefore the same documents.
"""
await _ensure_search_space_access(
session, user=user, search_space_id=payload.search_space_id
)
result = await session.execute(
select(SearchSourceConnector).where(
and_(
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
)
)
)
existing: SearchSourceConnector | None = None
for candidate in result.scalars().all():
cfg = candidate.config or {}
if cfg.get("vault_id") == payload.vault_id:
existing = candidate
break
now_iso = datetime.now(UTC).isoformat()
if existing is not None:
cfg = dict(existing.config or {})
cfg.update(
{
"vault_id": payload.vault_id,
"vault_name": payload.vault_name,
"source": "plugin",
"plugin_version": payload.plugin_version,
"device_id": payload.device_id,
"last_connect_at": now_iso,
}
)
if payload.device_label:
cfg["device_label"] = payload.device_label
cfg.pop("legacy", None)
cfg.pop("vault_path", None)
existing.config = cfg
existing.is_indexable = False
existing.search_space_id = payload.search_space_id
await session.commit()
await session.refresh(existing)
connector = existing
else:
connector = SearchSourceConnector(
name=f"Obsidian — {payload.vault_name}",
connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
is_indexable=False,
config={
"vault_id": payload.vault_id,
"vault_name": payload.vault_name,
"source": "plugin",
"plugin_version": payload.plugin_version,
"device_id": payload.device_id,
"device_label": payload.device_label,
"files_synced": 0,
"last_connect_at": now_iso,
},
user_id=user.id,
search_space_id=payload.search_space_id,
)
session.add(connector)
await session.commit()
await session.refresh(connector)
return ConnectResponse(
connector_id=connector.id,
vault_id=payload.vault_id,
search_space_id=connector.search_space_id,
**_build_handshake(),
)
@router.post("/sync")
async def obsidian_sync(
payload: SyncBatchRequest,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> dict[str, object]:
"""Batch-upsert notes pushed by the plugin.
Returns per-note ack so the plugin can dequeue successes and retry
failures.
"""
connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id
)
results: list[dict[str, object]] = []
indexed = 0
failed = 0
for note in payload.notes:
try:
doc = await upsert_note(
session, connector=connector, payload=note, user_id=str(user.id)
)
indexed += 1
results.append(
{"path": note.path, "status": "ok", "document_id": doc.id}
)
except HTTPException:
raise
except Exception as exc:
failed += 1
logger.exception(
"obsidian /sync failed for path=%s vault=%s",
note.path,
payload.vault_id,
)
results.append(
{"path": note.path, "status": "error", "error": str(exc)[:300]}
)
cfg = dict(connector.config or {})
cfg["last_sync_at"] = datetime.now(UTC).isoformat()
cfg["files_synced"] = int(cfg.get("files_synced", 0)) + indexed
connector.config = cfg
await session.commit()
return {
"vault_id": payload.vault_id,
"indexed": indexed,
"failed": failed,
"results": results,
}
@router.post("/rename")
async def obsidian_rename(
payload: RenameBatchRequest,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> dict[str, object]:
"""Apply a batch of vault rename events."""
connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id
)
results: list[dict[str, object]] = []
renamed = 0
missing = 0
for item in payload.renames:
try:
doc = await rename_note(
session,
connector=connector,
old_path=item.old_path,
new_path=item.new_path,
vault_id=payload.vault_id,
)
if doc is None:
missing += 1
results.append(
{
"old_path": item.old_path,
"new_path": item.new_path,
"status": "missing",
}
)
else:
renamed += 1
results.append(
{
"old_path": item.old_path,
"new_path": item.new_path,
"status": "ok",
"document_id": doc.id,
}
)
except Exception as exc:
logger.exception(
"obsidian /rename failed for old=%s new=%s vault=%s",
item.old_path,
item.new_path,
payload.vault_id,
)
results.append(
{
"old_path": item.old_path,
"new_path": item.new_path,
"status": "error",
"error": str(exc)[:300],
}
)
return {
"vault_id": payload.vault_id,
"renamed": renamed,
"missing": missing,
"results": results,
}
@router.delete("/notes")
async def obsidian_delete_notes(
payload: DeleteBatchRequest,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> dict[str, object]:
"""Soft-delete a batch of notes by vault-relative path."""
connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id
)
deleted = 0
missing = 0
results: list[dict[str, object]] = []
for path in payload.paths:
try:
ok = await delete_note(
session,
connector=connector,
vault_id=payload.vault_id,
path=path,
)
if ok:
deleted += 1
results.append({"path": path, "status": "ok"})
else:
missing += 1
results.append({"path": path, "status": "missing"})
except Exception as exc:
logger.exception(
"obsidian DELETE /notes failed for path=%s vault=%s",
path,
payload.vault_id,
)
results.append(
{"path": path, "status": "error", "error": str(exc)[:300]}
)
return {
"vault_id": payload.vault_id,
"deleted": deleted,
"missing": missing,
"results": results,
}
@router.get("/manifest", response_model=ManifestResponse)
async def obsidian_manifest(
vault_id: str = Query(..., description="Plugin-side stable vault UUID"),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> ManifestResponse:
"""Return the server-side ``{path: {hash, mtime}}`` manifest.
Used by the plugin's ``onload`` reconcile to find files that were
edited or deleted while the plugin was offline.
"""
connector = await _resolve_vault_connector(
session, user=user, vault_id=vault_id
)
return await get_manifest(session, connector=connector, vault_id=vault_id)

View file

@ -0,0 +1,147 @@
"""
Obsidian Plugin connector schemas.
Wire format spoken between the SurfSense Obsidian plugin
(``surfsense_obsidian/``) and the FastAPI backend.
Stability contract
------------------
Every request and response schema sets ``model_config = ConfigDict(extra='ignore')``.
This is the API stability contract not just hygiene:
- Old plugins talking to a newer backend silently drop any new response fields
they don't understand instead of failing validation.
- New plugins talking to an older backend can include forward-looking request
fields (e.g. attachments metadata) without the older backend rejecting them.
Hard breaking changes are reserved for the URL prefix (``/api/v2/...``).
Additive evolution is signaled via the ``capabilities`` array on
``HealthResponse`` / ``ConnectResponse`` older plugins ignore unknown
capability strings safely.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
_PLUGIN_MODEL_CONFIG = ConfigDict(extra="ignore")
class _PluginBase(BaseModel):
"""Base class for all plugin payload schemas.
Carries the forward-compatibility config so subclasses don't have to
repeat it.
"""
model_config = _PLUGIN_MODEL_CONFIG
class NotePayload(_PluginBase):
"""One Obsidian note as pushed by the plugin.
The plugin is the source of truth: ``content`` is the post-frontmatter
body, ``frontmatter``/``tags``/``headings``/etc. are precomputed by the
plugin via ``app.metadataCache`` so the backend doesn't have to re-parse.
"""
vault_id: str = Field(..., description="Stable plugin-generated UUID for this vault")
path: str = Field(..., description="Vault-relative path, e.g. 'notes/foo.md'")
name: str = Field(..., description="File stem (no extension)")
extension: str = Field(default="md", description="File extension without leading dot")
content: str = Field(default="", description="Raw markdown body (post-frontmatter)")
frontmatter: dict[str, Any] = Field(default_factory=dict)
tags: list[str] = Field(default_factory=list)
headings: list[str] = Field(default_factory=list)
resolved_links: list[str] = Field(default_factory=list)
unresolved_links: list[str] = Field(default_factory=list)
embeds: list[str] = Field(default_factory=list)
aliases: list[str] = Field(default_factory=list)
content_hash: str = Field(..., description="Plugin-computed SHA-256 of the raw content")
mtime: datetime
ctime: datetime
class SyncBatchRequest(_PluginBase):
"""Batch upsert. Plugin sends 10-20 notes per request to amortize HTTP overhead."""
vault_id: str
notes: list[NotePayload] = Field(default_factory=list, max_length=100)
class RenameItem(_PluginBase):
old_path: str
new_path: str
class RenameBatchRequest(_PluginBase):
vault_id: str
renames: list[RenameItem] = Field(default_factory=list, max_length=200)
class DeleteBatchRequest(_PluginBase):
vault_id: str
paths: list[str] = Field(default_factory=list, max_length=500)
class ManifestEntry(_PluginBase):
"""One row of the server-side manifest used by the plugin to reconcile."""
hash: str
mtime: datetime
class ManifestResponse(_PluginBase):
"""Path-keyed manifest of every non-deleted note for a vault."""
vault_id: str
items: dict[str, ManifestEntry] = Field(default_factory=dict)
class ConnectRequest(_PluginBase):
"""First-call handshake to register or look up a vault connector row."""
vault_id: str
vault_name: str
search_space_id: int
plugin_version: str
device_id: str
device_label: str | None = Field(
default=None,
description="User-friendly device name shown in the web UI (e.g. 'iPad Pro').",
)
class ConnectResponse(_PluginBase):
"""Returned from POST /connect.
Carries the same handshake fields as ``HealthResponse`` so the plugin
learns the contract on its very first call without an extra round-trip
to ``GET /health``.
"""
connector_id: int
vault_id: str
search_space_id: int
api_version: str
capabilities: list[str]
class HealthResponse(_PluginBase):
"""API contract handshake.
The plugin calls ``GET /health`` once per ``onload`` and caches the
result. ``capabilities`` is a forward-extensible string list: future
additions (``'pat_auth'``, ``'scoped_pat'``, ``'attachments_v2'``,
``'shared_search_spaces'``...) ship without breaking older plugins
because they only enable extra behavior, never gate existing endpoints.
"""
api_version: str
capabilities: list[str]
server_time_utc: datetime

View file

@ -0,0 +1,400 @@
"""
Obsidian plugin indexer service.
Bridges the SurfSense Obsidian plugin's HTTP payloads
(see ``app/schemas/obsidian_plugin.py``) into the shared
``IndexingPipelineService``.
Responsibilities:
- ``upsert_note`` push one note through the indexing pipeline; respects
unchanged content (skip) and version-snapshots existing rows before
rewrite.
- ``rename_note`` rewrite path-derived fields (path metadata,
``unique_identifier_hash``, ``source_url``) without re-indexing content.
- ``delete_note`` soft delete with a tombstone in ``document_metadata``
so reconciliation can distinguish "user explicitly killed this in the UI"
from "plugin hasn't synced yet".
- ``get_manifest`` return ``{path: {hash, mtime}}`` for every non-deleted
note belonging to a vault, used by the plugin's reconcile pass on
``onload``.
Design notes
------------
The plugin's content hash and the backend's ``content_hash`` are computed
differently (plugin uses raw SHA-256 of the markdown body; backend salts
with ``search_space_id``). We persist the plugin's hash in
``document_metadata['plugin_content_hash']`` so the manifest endpoint can
return what the plugin sent that's the only number the plugin can
compare without re-downloading content.
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from typing import Any
from urllib.parse import quote
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import (
Document,
DocumentStatus,
DocumentType,
SearchSourceConnector,
)
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.schemas.obsidian_plugin import (
ManifestEntry,
ManifestResponse,
NotePayload,
)
from app.services.llm_service import get_user_long_context_llm
from app.utils.document_converters import generate_unique_identifier_hash
from app.utils.document_versioning import create_version_snapshot
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _vault_path_unique_id(vault_id: str, path: str) -> str:
"""Stable identifier for a note. Vault-scoped so the same path under two
different vaults doesn't collide."""
return f"{vault_id}:{path}"
def _build_source_url(vault_name: str, path: str) -> str:
"""Build the ``obsidian://`` deep link for the web UI's "Open in Obsidian"
button. Both segments are URL-encoded because vault names and paths can
contain spaces, ``#``, ``?``, etc.
"""
return (
"obsidian://open"
f"?vault={quote(vault_name, safe='')}"
f"&file={quote(path, safe='')}"
)
def _build_metadata(
payload: NotePayload,
*,
vault_name: str,
connector_id: int,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Flatten the rich plugin payload into the JSONB ``document_metadata``
column. Keys here are what the chat UI / search UI surface to users.
"""
meta: dict[str, Any] = {
"source": "plugin",
"vault_id": payload.vault_id,
"vault_name": vault_name,
"file_path": payload.path,
"file_name": payload.name,
"extension": payload.extension,
"frontmatter": payload.frontmatter,
"tags": payload.tags,
"headings": payload.headings,
"outgoing_links": payload.resolved_links,
"unresolved_links": payload.unresolved_links,
"embeds": payload.embeds,
"aliases": payload.aliases,
"plugin_content_hash": payload.content_hash,
"mtime": payload.mtime.isoformat(),
"ctime": payload.ctime.isoformat(),
"connector_id": connector_id,
"url": _build_source_url(vault_name, payload.path),
}
if extra:
meta.update(extra)
return meta
def _build_document_string(payload: NotePayload, vault_name: str) -> str:
"""Compose the indexable string the pipeline embeds and chunks.
Mirrors the legacy obsidian indexer's METADATA + CONTENT framing so
existing search relevance heuristics keep working unchanged.
"""
tags_line = ", ".join(payload.tags) if payload.tags else "None"
links_line = (
", ".join(payload.resolved_links) if payload.resolved_links else "None"
)
return (
"<METADATA>\n"
f"Title: {payload.name}\n"
f"Vault: {vault_name}\n"
f"Path: {payload.path}\n"
f"Tags: {tags_line}\n"
f"Links to: {links_line}\n"
"</METADATA>\n\n"
"<CONTENT>\n"
f"{payload.content}\n"
"</CONTENT>\n"
)
async def _find_existing_document(
session: AsyncSession,
*,
search_space_id: int,
vault_id: str,
path: str,
) -> Document | None:
unique_id = _vault_path_unique_id(vault_id, path)
uid_hash = generate_unique_identifier_hash(
DocumentType.OBSIDIAN_CONNECTOR,
unique_id,
search_space_id,
)
result = await session.execute(
select(Document).where(Document.unique_identifier_hash == uid_hash)
)
return result.scalars().first()
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def upsert_note(
session: AsyncSession,
*,
connector: SearchSourceConnector,
payload: NotePayload,
user_id: str,
) -> Document:
"""Index or refresh a single note pushed by the plugin.
Returns the resulting ``Document`` (whether newly created, updated, or
a skip-because-unchanged hit).
"""
vault_name: str = (connector.config or {}).get("vault_name") or "Vault"
search_space_id = connector.search_space_id
existing = await _find_existing_document(
session,
search_space_id=search_space_id,
vault_id=payload.vault_id,
path=payload.path,
)
plugin_hash = payload.content_hash
if existing is not None:
existing_meta = existing.document_metadata or {}
was_tombstoned = bool(existing_meta.get("deleted_at"))
if (
not was_tombstoned
and existing_meta.get("plugin_content_hash") == plugin_hash
and DocumentStatus.is_state(existing.status, DocumentStatus.READY)
):
return existing
try:
await create_version_snapshot(session, existing)
except Exception:
logger.debug(
"version snapshot failed for obsidian doc %s",
existing.id,
exc_info=True,
)
document_string = _build_document_string(payload, vault_name)
metadata = _build_metadata(
payload,
vault_name=vault_name,
connector_id=connector.id,
)
connector_doc = ConnectorDocument(
title=payload.name,
source_markdown=document_string,
unique_id=_vault_path_unique_id(payload.vault_id, payload.path),
document_type=DocumentType.OBSIDIAN_CONNECTOR,
search_space_id=search_space_id,
connector_id=connector.id,
created_by_id=str(user_id),
should_summarize=connector.enable_summary,
fallback_summary=f"Obsidian Note: {payload.name}\n\n{payload.content}",
metadata=metadata,
)
pipeline = IndexingPipelineService(session)
prepared = await pipeline.prepare_for_indexing([connector_doc])
if not prepared:
if existing is not None:
return existing
raise RuntimeError(
f"Indexing pipeline rejected obsidian note {payload.path}"
)
document = prepared[0]
llm = await get_user_long_context_llm(session, str(user_id), search_space_id)
return await pipeline.index(document, connector_doc, llm)
async def rename_note(
session: AsyncSession,
*,
connector: SearchSourceConnector,
old_path: str,
new_path: str,
vault_id: str,
) -> Document | None:
"""Rewrite path-derived columns without re-indexing content.
Returns the updated document, or ``None`` if no row matched the
``old_path`` (this happens when the plugin is renaming a file that was
never synced safe to ignore, the next ``sync`` will create it under
the new path).
"""
vault_name: str = (connector.config or {}).get("vault_name") or "Vault"
search_space_id = connector.search_space_id
existing = await _find_existing_document(
session,
search_space_id=search_space_id,
vault_id=vault_id,
path=old_path,
)
if existing is None:
return None
new_unique_id = _vault_path_unique_id(vault_id, new_path)
new_uid_hash = generate_unique_identifier_hash(
DocumentType.OBSIDIAN_CONNECTOR,
new_unique_id,
search_space_id,
)
collision = await session.execute(
select(Document).where(
and_(
Document.unique_identifier_hash == new_uid_hash,
Document.id != existing.id,
)
)
)
collision_row = collision.scalars().first()
if collision_row is not None:
logger.warning(
"obsidian rename target already exists "
"(vault=%s old=%s new=%s); skipping rename so the next /sync "
"can resolve the conflict via content_hash",
vault_id,
old_path,
new_path,
)
return existing
new_filename = new_path.rsplit("/", 1)[-1]
new_stem = new_filename.rsplit(".", 1)[0] if "." in new_filename else new_filename
existing.unique_identifier_hash = new_uid_hash
existing.title = new_stem
meta = dict(existing.document_metadata or {})
meta["file_path"] = new_path
meta["file_name"] = new_stem
meta["url"] = _build_source_url(vault_name, new_path)
existing.document_metadata = meta
existing.updated_at = datetime.now(UTC)
await session.commit()
return existing
async def delete_note(
session: AsyncSession,
*,
connector: SearchSourceConnector,
vault_id: str,
path: str,
) -> bool:
"""Soft-delete via tombstone in ``document_metadata``.
The row is *not* removed and chunks are *not* dropped, so existing
citations in chat threads remain resolvable. The manifest endpoint
filters tombstoned rows out, so the plugin's reconcile pass will not
see this path and won't try to "resurrect" a note the user deleted in
the SurfSense UI.
Returns True if a row was tombstoned, False if no matching row existed.
"""
existing = await _find_existing_document(
session,
search_space_id=connector.search_space_id,
vault_id=vault_id,
path=path,
)
if existing is None:
return False
meta = dict(existing.document_metadata or {})
if meta.get("deleted_at"):
return True
meta["deleted_at"] = datetime.now(UTC).isoformat()
meta["deleted_by_source"] = "plugin"
existing.document_metadata = meta
existing.updated_at = datetime.now(UTC)
await session.commit()
return True
async def get_manifest(
session: AsyncSession,
*,
connector: SearchSourceConnector,
vault_id: str,
) -> ManifestResponse:
"""Return ``{path: {hash, mtime}}`` for every non-deleted note in this
vault.
The plugin compares this against its local vault on every ``onload`` to
catch up edits made while offline. Rows missing ``plugin_content_hash``
(e.g. tombstoned, or somehow indexed without going through this
service) are excluded so the plugin doesn't get confused by partial
data.
"""
result = await session.execute(
select(Document).where(
and_(
Document.search_space_id == connector.search_space_id,
Document.connector_id == connector.id,
Document.document_type == DocumentType.OBSIDIAN_CONNECTOR,
)
)
)
items: dict[str, ManifestEntry] = {}
for doc in result.scalars().all():
meta = doc.document_metadata or {}
if meta.get("deleted_at"):
continue
if meta.get("vault_id") != vault_id:
continue
path = meta.get("file_path")
plugin_hash = meta.get("plugin_content_hash")
mtime_raw = meta.get("mtime")
if not path or not plugin_hash or not mtime_raw:
continue
try:
mtime = datetime.fromisoformat(mtime_raw)
except ValueError:
continue
items[path] = ManifestEntry(hash=plugin_hash, mtime=mtime)
return ManifestResponse(vault_id=vault_id, items=items)