diff --git a/surfsense_backend/app/routes/obsidian_plugin_routes.py b/surfsense_backend/app/routes/obsidian_plugin_routes.py index 9b03464bb..edc99cae6 100644 --- a/surfsense_backend/app/routes/obsidian_plugin_routes.py +++ b/surfsense_backend/app/routes/obsidian_plugin_routes.py @@ -11,11 +11,13 @@ import logging from datetime import UTC, datetime from fastapi import APIRouter, Depends, HTTPException, Query, status -from sqlalchemy import and_ +from sqlalchemy import and_, case, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from app.db import ( + Document, + DocumentType, SearchSourceConnector, SearchSourceConnectorType, SearchSpace, @@ -48,7 +50,7 @@ router = APIRouter(prefix="/obsidian", tags=["obsidian-plugin"]) OBSIDIAN_API_VERSION = "1" # Plugins feature-gate on these. Add entries, never rename or remove. -OBSIDIAN_CAPABILITIES: list[str] = ["sync", "rename", "delete", "manifest"] +OBSIDIAN_CAPABILITIES: list[str] = ["sync", "rename", "delete", "manifest", "stats"] # --------------------------------------------------------------------------- @@ -63,47 +65,13 @@ def _build_handshake() -> dict[str, object]: } -def _upsert_device( - existing_devices: object, - device_id: str, - now_iso: str, -) -> dict[str, dict[str, str]]: - """Upsert ``device_id`` into ``{device_id: {first_seen_at, last_seen_at}}``. - - Keyed by device_id for O(1) dedup; ``len(devices)`` is the count. - Timestamps are kept for a future stale-device pruner. - """ - devices: dict[str, dict[str, str]] = {} - if isinstance(existing_devices, dict): - for key, val in existing_devices.items(): - if not isinstance(key, str) or not key or not isinstance(val, dict): - continue - devices[key] = { - "first_seen_at": str(val.get("first_seen_at") or now_iso), - "last_seen_at": str(val.get("last_seen_at") or now_iso), - } - - prev = devices.get(device_id) - devices[device_id] = { - "first_seen_at": prev["first_seen_at"] if prev else now_iso, - "last_seen_at": now_iso, - } - return devices - - async def _resolve_vault_connector( session: AsyncSession, *, user: User, vault_id: str, - for_update: bool = False, ) -> SearchSourceConnector: - """Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user. - - Callers that mutate ``connector.config`` MUST pass ``for_update=True`` or - concurrent heartbeats will race and lose writes on ``config.devices`` / - ``config.files_synced``. - """ + """Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user.""" stmt = select(SearchSourceConnector).where( and_( SearchSourceConnector.user_id == user.id, @@ -113,8 +81,6 @@ async def _resolve_vault_connector( SearchSourceConnector.config["source"].astext == "plugin", ) ) - if for_update: - stmt = stmt.with_for_update() connector = (await session.execute(stmt)).scalars().first() if connector is not None: @@ -182,14 +148,13 @@ async def obsidian_connect( """Register a vault, or return the existing connector row. Idempotent on (user_id, OBSIDIAN_CONNECTOR, vault_id). Called on every - plugin onload as a heartbeat — upserts ``device_id`` into - ``config['devices']`` so the web UI can show a "Devices: N" tile. + plugin onload as a heartbeat. """ await _ensure_search_space_access( session, user=user, search_space_id=payload.search_space_id ) - # FOR UPDATE so concurrent heartbeats can't clobber each other's device entry. + # FOR UPDATE so concurrent /connect calls for the same vault can't race. existing: SearchSourceConnector | None = ( ( await session.execute( @@ -211,19 +176,14 @@ async def obsidian_connect( ) now_iso = datetime.now(UTC).isoformat() + cfg = { + "vault_id": payload.vault_id, + "vault_name": payload.vault_name, + "source": "plugin", + "last_connect_at": now_iso, + } if existing is not None: - cfg = dict(existing.config or {}) - devices = _upsert_device(cfg.get("devices"), payload.device_id, now_iso) - cfg.update( - { - "vault_id": payload.vault_id, - "vault_name": payload.vault_name, - "source": "plugin", - "devices": devices, - "last_connect_at": now_iso, - } - ) existing.config = cfg # Re-stamp on every connect so vault renames in Obsidian propagate; # the web UI hides the Name input for Obsidian connectors. @@ -234,19 +194,11 @@ async def obsidian_connect( await session.refresh(existing) connector = existing else: - devices = _upsert_device(None, payload.device_id, now_iso) connector = SearchSourceConnector( name=f"Obsidian — {payload.vault_name}", connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR, is_indexable=False, - config={ - "vault_id": payload.vault_id, - "vault_name": payload.vault_name, - "source": "plugin", - "devices": devices, - "files_synced": 0, - "last_connect_at": now_iso, - }, + config=cfg, user_id=user.id, search_space_id=payload.search_space_id, ) @@ -270,7 +222,7 @@ async def obsidian_sync( ) -> dict[str, object]: """Batch-upsert notes; returns per-note ack so the plugin can dequeue/retry.""" connector = await _resolve_vault_connector( - session, user=user, vault_id=payload.vault_id, for_update=True + session, user=user, vault_id=payload.vault_id ) results: list[dict[str, object]] = [] @@ -299,12 +251,6 @@ async def obsidian_sync( {"path": note.path, "status": "error", "error": str(exc)[:300]} ) - cfg = dict(connector.config or {}) - cfg["last_sync_at"] = datetime.now(UTC).isoformat() - cfg["files_synced"] = int(cfg.get("files_synced", 0)) + indexed - connector.config = cfg - await session.commit() - return { "vault_id": payload.vault_id, "indexed": indexed, @@ -321,7 +267,7 @@ async def obsidian_rename( ) -> dict[str, object]: """Apply a batch of vault rename events.""" connector = await _resolve_vault_connector( - session, user=user, vault_id=payload.vault_id, for_update=True + session, user=user, vault_id=payload.vault_id ) results: list[dict[str, object]] = [] @@ -388,7 +334,7 @@ async def obsidian_delete_notes( ) -> dict[str, object]: """Soft-delete a batch of notes by vault-relative path.""" connector = await _resolve_vault_connector( - session, user=user, vault_id=payload.vault_id, for_update=True + session, user=user, vault_id=payload.vault_id ) deleted = 0 @@ -437,3 +383,41 @@ async def obsidian_manifest( session, user=user, vault_id=vault_id ) return await get_manifest(session, connector=connector, vault_id=vault_id) + + +@router.get("/stats") +async def obsidian_stats( + vault_id: str = Query(..., description="Plugin-side stable vault UUID"), + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> dict[str, object]: + """Active-note count + last sync time for the web tile. + + ``files_synced`` excludes tombstones so it matches ``/manifest``; + ``last_sync_at`` includes them so deletes advance the freshness signal. + """ + connector = await _resolve_vault_connector( + session, user=user, vault_id=vault_id + ) + + is_active = Document.document_metadata["deleted_at"].astext.is_(None) + + row = ( + await session.execute( + select( + func.count(case((is_active, 1))).label("files_synced"), + func.max(Document.updated_at).label("last_sync_at"), + ).where( + and_( + Document.connector_id == connector.id, + Document.document_type == DocumentType.OBSIDIAN_CONNECTOR, + ) + ) + ) + ).first() + + return { + "vault_id": vault_id, + "files_synced": int(row[0] or 0), + "last_sync_at": row[1].isoformat() if row[1] else None, + } diff --git a/surfsense_backend/app/schemas/obsidian_plugin.py b/surfsense_backend/app/schemas/obsidian_plugin.py index 94b0cdb77..7f77efd40 100644 --- a/surfsense_backend/app/schemas/obsidian_plugin.py +++ b/surfsense_backend/app/schemas/obsidian_plugin.py @@ -83,7 +83,6 @@ class ConnectRequest(_PluginBase): vault_id: str vault_name: str search_space_id: int - device_id: str class ConnectResponse(_PluginBase): diff --git a/surfsense_obsidian/src/api-client.ts b/surfsense_obsidian/src/api-client.ts index fda6497c4..9ab626c37 100644 --- a/surfsense_obsidian/src/api-client.ts +++ b/surfsense_obsidian/src/api-client.ts @@ -55,6 +55,14 @@ export class PermanentError extends Error { } } +/** 404 `VAULT_NOT_REGISTERED` — `/connect` hasn't committed yet; retry after reconnect. */ +export class VaultNotRegisteredError extends TransientError { + constructor(message: string) { + super(404, message); + this.name = "VaultNotRegisteredError"; + } +} + export interface ApiClientOptions { getServerUrl: () => string; getToken: () => string; @@ -99,7 +107,6 @@ export class SurfSenseApiClient { searchSpaceId: number; vaultId: string; vaultName: string; - deviceId: string; }): Promise { return await this.request( "POST", @@ -108,7 +115,6 @@ export class SurfSenseApiClient { vault_id: input.vaultId, vault_name: input.vaultName, search_space_id: input.searchSpaceId, - device_id: input.deviceId, } ); } @@ -210,6 +216,10 @@ export class SurfSenseApiClient { throw new TransientError(resp.status, detail || `HTTP ${resp.status}`); } + if (resp.status === 404 && extractCode(resp) === "VAULT_NOT_REGISTERED") { + throw new VaultNotRegisteredError(detail || "Vault not registered yet"); + } + throw new PermanentError(resp.status, detail || `HTTP ${resp.status}`); } } @@ -235,5 +245,20 @@ function extractDetail(resp: RequestUrlResponse): string { const json = safeJson(resp); if (typeof json.detail === "string") return json.detail; if (typeof json.message === "string") return json.message; + const detailObj = json.detail; + if (detailObj && typeof detailObj === "object") { + const obj = detailObj as Record; + if (typeof obj.message === "string") return obj.message; + } return resp.text?.slice(0, 200) ?? ""; } + +function extractCode(resp: RequestUrlResponse): string | undefined { + const json = safeJson(resp); + const detailObj = json.detail; + if (detailObj && typeof detailObj === "object") { + const code = (detailObj as Record).code; + if (typeof code === "string") return code; + } + return undefined; +} diff --git a/surfsense_obsidian/src/main.ts b/surfsense_obsidian/src/main.ts index 512a74a44..0aa25ae58 100644 --- a/surfsense_obsidian/src/main.ts +++ b/surfsense_obsidian/src/main.ts @@ -17,12 +17,6 @@ export default class SurfSensePlugin extends Plugin { api!: SurfSenseApiClient; queue!: PersistentQueue; engine!: SyncEngine; - /** - * Per-install identifier kept in `app.saveLocalStorage` rather than - * `data.json`, so it does NOT travel through Obsidian Sync — each - * machine on a synced vault stays distinguishable. - */ - deviceId = ""; private statusBar: StatusBar | null = null; lastStatus: StatusState = { kind: "idle", queueDepth: 0 }; serverCapabilities: string[] = []; @@ -56,7 +50,6 @@ export default class SurfSensePlugin extends Plugin { await this.saveSettings(); this.settingTab?.renderStatus(); }, - getDeviceId: () => this.deviceId, setStatus: (s) => { this.lastStatus = s; this.statusBar?.update(s); @@ -174,28 +167,11 @@ export default class SurfSensePlugin extends Plugin { await this.saveData(this.settings); } - /** - * Mint vault_id (in data.json, travels with the vault) and device_id - * (in `app.saveLocalStorage`, stays per-install) on first run. - */ + /** Mint vault_id (in data.json, travels with the vault) on first run. */ private seedIdentity(): void { if (!this.settings.vaultId) { this.settings.vaultId = generateUuid(); } - - // loadLocalStorage / saveLocalStorage aren't in the d.ts; cast at the boundary. - const localStore = this.app as unknown as { - loadLocalStorage: (key: string) => string | null; - saveLocalStorage: (key: string, value: string | null) => void; - }; - const storageKey = "surfsense:deviceId"; - let deviceId = localStore.loadLocalStorage(storageKey); - if (!deviceId) { - deviceId = generateUuid(); - localStore.saveLocalStorage(storageKey, deviceId); - } - this.deviceId = deviceId; - if (!this.settings.vaultName) { this.settings.vaultName = this.app.vault.getName(); } diff --git a/surfsense_obsidian/src/sync-engine.ts b/surfsense_obsidian/src/sync-engine.ts index b2c1b0a5a..281b23f0f 100644 --- a/surfsense_obsidian/src/sync-engine.ts +++ b/surfsense_obsidian/src/sync-engine.ts @@ -4,6 +4,7 @@ import { PermanentError, type SurfSenseApiClient, TransientError, + VaultNotRegisteredError, } from "./api-client"; import { isExcluded } from "./excludes"; import { buildNotePayload, computeContentHash } from "./payload"; @@ -29,8 +30,6 @@ export interface SyncEngineDeps { queue: PersistentQueue; getSettings: () => SyncEngineSettings; saveSettings: (mut: (s: SyncEngineSettings) => void) => Promise; - /** Per-install id sourced from app.saveLocalStorage (not synced data.json). */ - getDeviceId: () => string; setStatus: (s: StatusState) => void; onCapabilities: (caps: string[], apiVersion: string) => void; } @@ -89,12 +88,11 @@ export class SyncEngine { return; } - // Re-announce on every load: /connect doubles as the device heartbeat - // that bumps last_seen_at and powers the "Devices: N" tile in the web UI. + // Re-announce so the backend sees the latest vault_name + last_connect_at. + // flushQueue owns the connectorId gate, so a failed connect here still + // leaves the queue stable for the next trigger. await this.ensureConnected(); - if (!this.deps.getSettings().connectorId) return; - await this.flushQueue(); await this.maybeReconcile(); this.setStatus(this.queueStatusKind(), undefined); @@ -112,7 +110,6 @@ export class SyncEngine { searchSpaceId: settings.searchSpaceId, vaultId: settings.vaultId, vaultName: settings.vaultName, - deviceId: this.deps.getDeviceId(), }); this.applyHealth(resp); await this.deps.saveSettings((s) => { @@ -205,6 +202,11 @@ export class SyncEngine { async flushQueue(): Promise { if (this.deps.queue.size === 0) return; + // Shared gate for every flush trigger so the first /sync can't race /connect. + if (!this.deps.getSettings().connectorId) { + await this.ensureConnected(); + if (!this.deps.getSettings().connectorId) return; + } this.setStatus("syncing", `Syncing ${this.deps.queue.size} item(s)…`); const summary = await this.deps.queue.drain({ processBatch: (batch) => this.processBatch(batch), @@ -237,10 +239,14 @@ export class SyncEngine { }); acked.push(...renames); } catch (err) { - const verdict = this.classify(err); - if (verdict === "stop") return { acked, retry: [...retry, ...renames], dropped, stop: true }; - if (verdict === "retry") retry.push(...renames); - else dropped.push(...renames); + if (await this.handleVaultNotRegistered(err)) { + retry.push(...renames); + } else { + const verdict = this.classify(err); + if (verdict === "stop") return { acked, retry: [...retry, ...renames], dropped, stop: true }; + if (verdict === "retry") retry.push(...renames); + else dropped.push(...renames); + } } } @@ -252,10 +258,14 @@ export class SyncEngine { }); acked.push(...deletes); } catch (err) { - const verdict = this.classify(err); - if (verdict === "stop") return { acked, retry: [...retry, ...deletes], dropped, stop: true }; - if (verdict === "retry") retry.push(...deletes); - else dropped.push(...deletes); + if (await this.handleVaultNotRegistered(err)) { + retry.push(...deletes); + } else { + const verdict = this.classify(err); + if (verdict === "stop") return { acked, retry: [...retry, ...deletes], dropped, stop: true }; + if (verdict === "retry") retry.push(...deletes); + else dropped.push(...deletes); + } } } @@ -292,11 +302,18 @@ export class SyncEngine { else acked.push(item); } } catch (err) { - const verdict = this.classify(err); - if (verdict === "stop") - return { acked, retry: [...retry, ...upserts], dropped, stop: true }; - if (verdict === "retry") retry.push(...upserts); - else dropped.push(...upserts); + if (await this.handleVaultNotRegistered(err)) { + for (const item of upserts) { + if (retry.find((r) => r === item)) continue; + retry.push(item); + } + } else { + const verdict = this.classify(err); + if (verdict === "stop") + return { acked, retry: [...retry, ...upserts], dropped, stop: true }; + if (verdict === "retry") retry.push(...upserts); + else dropped.push(...upserts); + } } } } @@ -427,6 +444,14 @@ export class SyncEngine { this.setStatus("error", (err as Error).message ?? "Unknown error"); } + /** Re-connect on VAULT_NOT_REGISTERED so the next drain sees the new row. */ + private async handleVaultNotRegistered(err: unknown): Promise { + if (!(err instanceof VaultNotRegisteredError)) return false; + console.warn("SurfSense: vault not registered, re-connecting before retry", err); + await this.ensureConnected(); + return true; + } + private classify(err: unknown): "ack" | "retry" | "drop" | "stop" { if (err instanceof AuthError) { this.setStatus("auth-error", err.message); diff --git a/surfsense_obsidian/src/types.ts b/surfsense_obsidian/src/types.ts index 33b0d01a7..c37868733 100644 --- a/surfsense_obsidian/src/types.ts +++ b/surfsense_obsidian/src/types.ts @@ -8,8 +8,6 @@ export interface SurfsensePluginSettings { /** UUID for the vault — lives here so Obsidian Sync replicates it across devices. */ vaultId: string; vaultName: string; - // Per-install deviceId is NOT in this interface on purpose: it lives in - // app.saveLocalStorage so it stays distinct on each device. See seedIdentity(). syncMode: "auto" | "manual"; excludePatterns: string[]; includeAttachments: boolean; diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/obsidian-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/obsidian-config.tsx index 7bfb60f50..6c81353ee 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/obsidian-config.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/obsidian-config.tsx @@ -1,9 +1,10 @@ "use client"; import { AlertTriangle, Download, Info } from "lucide-react"; -import { type FC, useMemo } from "react"; +import { type FC, useEffect, useMemo, useState } from "react"; import { Alert, AlertDescription, AlertTitle } from "@/components/ui/alert"; import { Button } from "@/components/ui/button"; +import { connectorsApiService, type ObsidianStats } from "@/lib/apis/connectors-api.service"; import type { ConnectorConfigProps } from "../index"; const PLUGIN_RELEASES_URL = @@ -98,29 +99,49 @@ const LegacyBanner: FC = () => { }; const PluginStats: FC<{ config: Record }> = ({ config }) => { - const stats: { label: string; value: string }[] = useMemo(() => { - const filesSynced = config.files_synced; - // Derive from config.devices — a stored counter could drift under concurrent heartbeats. - const deviceCount = - config.devices && typeof config.devices === "object" - ? Object.keys(config.devices as Record).length - : null; + const vaultId = typeof config.vault_id === "string" ? config.vault_id : null; + const [stats, setStats] = useState(null); + const [statsError, setStatsError] = useState(false); + + useEffect(() => { + if (!vaultId) return; + let cancelled = false; + setStats(null); + setStatsError(false); + connectorsApiService + .getObsidianStats(vaultId) + .then((result) => { + if (!cancelled) setStats(result); + }) + .catch((err) => { + if (!cancelled) { + console.error("Failed to fetch Obsidian stats", err); + setStatsError(true); + } + }); + return () => { + cancelled = true; + }; + }, [vaultId]); + + const tileRows = useMemo(() => { + const placeholder = statsError ? "—" : stats ? null : "…"; return [ { label: "Vault", value: (config.vault_name as string) || "—" }, - { - label: "Devices", - value: deviceCount !== null ? deviceCount.toLocaleString() : "—", - }, { label: "Last sync", - value: formatTimestamp(config.last_sync_at), + value: placeholder ?? formatTimestamp(stats?.last_sync_at ?? null), }, { label: "Files synced", - value: typeof filesSynced === "number" ? filesSynced.toLocaleString() : "—", + value: + placeholder ?? + (typeof stats?.files_synced === "number" + ? stats.files_synced.toLocaleString() + : "—"), }, ]; - }, [config]); + }, [config.vault_name, stats, statsError]); return (
@@ -136,7 +157,7 @@ const PluginStats: FC<{ config: Record }> = ({ config }) => {

Vault status

- {stats.map((stat) => ( + {tileRows.map((stat) => (
=> { + return baseApiService.get( + `/api/v1/obsidian/stats?vault_id=${encodeURIComponent(vaultId)}` + ); + }; +} + +export interface ObsidianStats { + vault_id: string; + files_synced: number; + last_sync_at: string | null; } export type { SlackChannel, DiscordChannel };