feat: enhance Obsidian plugin with new stats feature, improved error handling, and streamlined device management

This commit is contained in:
Anish Sarkar 2026-04-20 21:07:15 +05:30
parent 54e66e131a
commit 2251e464c7
8 changed files with 178 additions and 137 deletions

View file

@ -11,11 +11,13 @@ import logging
from datetime import UTC, datetime
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import and_
from sqlalchemy import and_, case, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.db import (
Document,
DocumentType,
SearchSourceConnector,
SearchSourceConnectorType,
SearchSpace,
@ -48,7 +50,7 @@ router = APIRouter(prefix="/obsidian", tags=["obsidian-plugin"])
OBSIDIAN_API_VERSION = "1"
# Plugins feature-gate on these. Add entries, never rename or remove.
OBSIDIAN_CAPABILITIES: list[str] = ["sync", "rename", "delete", "manifest"]
OBSIDIAN_CAPABILITIES: list[str] = ["sync", "rename", "delete", "manifest", "stats"]
# ---------------------------------------------------------------------------
@ -63,47 +65,13 @@ def _build_handshake() -> dict[str, object]:
}
def _upsert_device(
existing_devices: object,
device_id: str,
now_iso: str,
) -> dict[str, dict[str, str]]:
"""Upsert ``device_id`` into ``{device_id: {first_seen_at, last_seen_at}}``.
Keyed by device_id for O(1) dedup; ``len(devices)`` is the count.
Timestamps are kept for a future stale-device pruner.
"""
devices: dict[str, dict[str, str]] = {}
if isinstance(existing_devices, dict):
for key, val in existing_devices.items():
if not isinstance(key, str) or not key or not isinstance(val, dict):
continue
devices[key] = {
"first_seen_at": str(val.get("first_seen_at") or now_iso),
"last_seen_at": str(val.get("last_seen_at") or now_iso),
}
prev = devices.get(device_id)
devices[device_id] = {
"first_seen_at": prev["first_seen_at"] if prev else now_iso,
"last_seen_at": now_iso,
}
return devices
async def _resolve_vault_connector(
session: AsyncSession,
*,
user: User,
vault_id: str,
for_update: bool = False,
) -> SearchSourceConnector:
"""Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user.
Callers that mutate ``connector.config`` MUST pass ``for_update=True`` or
concurrent heartbeats will race and lose writes on ``config.devices`` /
``config.files_synced``.
"""
"""Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user."""
stmt = select(SearchSourceConnector).where(
and_(
SearchSourceConnector.user_id == user.id,
@ -113,8 +81,6 @@ async def _resolve_vault_connector(
SearchSourceConnector.config["source"].astext == "plugin",
)
)
if for_update:
stmt = stmt.with_for_update()
connector = (await session.execute(stmt)).scalars().first()
if connector is not None:
@ -182,14 +148,13 @@ async def obsidian_connect(
"""Register a vault, or return the existing connector row.
Idempotent on (user_id, OBSIDIAN_CONNECTOR, vault_id). Called on every
plugin onload as a heartbeat upserts ``device_id`` into
``config['devices']`` so the web UI can show a "Devices: N" tile.
plugin onload as a heartbeat.
"""
await _ensure_search_space_access(
session, user=user, search_space_id=payload.search_space_id
)
# FOR UPDATE so concurrent heartbeats can't clobber each other's device entry.
# FOR UPDATE so concurrent /connect calls for the same vault can't race.
existing: SearchSourceConnector | None = (
(
await session.execute(
@ -211,19 +176,14 @@ async def obsidian_connect(
)
now_iso = datetime.now(UTC).isoformat()
cfg = {
"vault_id": payload.vault_id,
"vault_name": payload.vault_name,
"source": "plugin",
"last_connect_at": now_iso,
}
if existing is not None:
cfg = dict(existing.config or {})
devices = _upsert_device(cfg.get("devices"), payload.device_id, now_iso)
cfg.update(
{
"vault_id": payload.vault_id,
"vault_name": payload.vault_name,
"source": "plugin",
"devices": devices,
"last_connect_at": now_iso,
}
)
existing.config = cfg
# Re-stamp on every connect so vault renames in Obsidian propagate;
# the web UI hides the Name input for Obsidian connectors.
@ -234,19 +194,11 @@ async def obsidian_connect(
await session.refresh(existing)
connector = existing
else:
devices = _upsert_device(None, payload.device_id, now_iso)
connector = SearchSourceConnector(
name=f"Obsidian — {payload.vault_name}",
connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
is_indexable=False,
config={
"vault_id": payload.vault_id,
"vault_name": payload.vault_name,
"source": "plugin",
"devices": devices,
"files_synced": 0,
"last_connect_at": now_iso,
},
config=cfg,
user_id=user.id,
search_space_id=payload.search_space_id,
)
@ -270,7 +222,7 @@ async def obsidian_sync(
) -> dict[str, object]:
"""Batch-upsert notes; returns per-note ack so the plugin can dequeue/retry."""
connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id, for_update=True
session, user=user, vault_id=payload.vault_id
)
results: list[dict[str, object]] = []
@ -299,12 +251,6 @@ async def obsidian_sync(
{"path": note.path, "status": "error", "error": str(exc)[:300]}
)
cfg = dict(connector.config or {})
cfg["last_sync_at"] = datetime.now(UTC).isoformat()
cfg["files_synced"] = int(cfg.get("files_synced", 0)) + indexed
connector.config = cfg
await session.commit()
return {
"vault_id": payload.vault_id,
"indexed": indexed,
@ -321,7 +267,7 @@ async def obsidian_rename(
) -> dict[str, object]:
"""Apply a batch of vault rename events."""
connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id, for_update=True
session, user=user, vault_id=payload.vault_id
)
results: list[dict[str, object]] = []
@ -388,7 +334,7 @@ async def obsidian_delete_notes(
) -> dict[str, object]:
"""Soft-delete a batch of notes by vault-relative path."""
connector = await _resolve_vault_connector(
session, user=user, vault_id=payload.vault_id, for_update=True
session, user=user, vault_id=payload.vault_id
)
deleted = 0
@ -437,3 +383,41 @@ async def obsidian_manifest(
session, user=user, vault_id=vault_id
)
return await get_manifest(session, connector=connector, vault_id=vault_id)
@router.get("/stats")
async def obsidian_stats(
vault_id: str = Query(..., description="Plugin-side stable vault UUID"),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> dict[str, object]:
"""Active-note count + last sync time for the web tile.
``files_synced`` excludes tombstones so it matches ``/manifest``;
``last_sync_at`` includes them so deletes advance the freshness signal.
"""
connector = await _resolve_vault_connector(
session, user=user, vault_id=vault_id
)
is_active = Document.document_metadata["deleted_at"].astext.is_(None)
row = (
await session.execute(
select(
func.count(case((is_active, 1))).label("files_synced"),
func.max(Document.updated_at).label("last_sync_at"),
).where(
and_(
Document.connector_id == connector.id,
Document.document_type == DocumentType.OBSIDIAN_CONNECTOR,
)
)
)
).first()
return {
"vault_id": vault_id,
"files_synced": int(row[0] or 0),
"last_sync_at": row[1].isoformat() if row[1] else None,
}

View file

@ -83,7 +83,6 @@ class ConnectRequest(_PluginBase):
vault_id: str
vault_name: str
search_space_id: int
device_id: str
class ConnectResponse(_PluginBase):

View file

@ -55,6 +55,14 @@ export class PermanentError extends Error {
}
}
/** 404 `VAULT_NOT_REGISTERED` — `/connect` hasn't committed yet; retry after reconnect. */
export class VaultNotRegisteredError extends TransientError {
constructor(message: string) {
super(404, message);
this.name = "VaultNotRegisteredError";
}
}
export interface ApiClientOptions {
getServerUrl: () => string;
getToken: () => string;
@ -99,7 +107,6 @@ export class SurfSenseApiClient {
searchSpaceId: number;
vaultId: string;
vaultName: string;
deviceId: string;
}): Promise<ConnectResponse> {
return await this.request<ConnectResponse>(
"POST",
@ -108,7 +115,6 @@ export class SurfSenseApiClient {
vault_id: input.vaultId,
vault_name: input.vaultName,
search_space_id: input.searchSpaceId,
device_id: input.deviceId,
}
);
}
@ -210,6 +216,10 @@ export class SurfSenseApiClient {
throw new TransientError(resp.status, detail || `HTTP ${resp.status}`);
}
if (resp.status === 404 && extractCode(resp) === "VAULT_NOT_REGISTERED") {
throw new VaultNotRegisteredError(detail || "Vault not registered yet");
}
throw new PermanentError(resp.status, detail || `HTTP ${resp.status}`);
}
}
@ -235,5 +245,20 @@ function extractDetail(resp: RequestUrlResponse): string {
const json = safeJson(resp);
if (typeof json.detail === "string") return json.detail;
if (typeof json.message === "string") return json.message;
const detailObj = json.detail;
if (detailObj && typeof detailObj === "object") {
const obj = detailObj as Record<string, unknown>;
if (typeof obj.message === "string") return obj.message;
}
return resp.text?.slice(0, 200) ?? "";
}
function extractCode(resp: RequestUrlResponse): string | undefined {
const json = safeJson(resp);
const detailObj = json.detail;
if (detailObj && typeof detailObj === "object") {
const code = (detailObj as Record<string, unknown>).code;
if (typeof code === "string") return code;
}
return undefined;
}

View file

@ -17,12 +17,6 @@ export default class SurfSensePlugin extends Plugin {
api!: SurfSenseApiClient;
queue!: PersistentQueue;
engine!: SyncEngine;
/**
* Per-install identifier kept in `app.saveLocalStorage` rather than
* `data.json`, so it does NOT travel through Obsidian Sync each
* machine on a synced vault stays distinguishable.
*/
deviceId = "";
private statusBar: StatusBar | null = null;
lastStatus: StatusState = { kind: "idle", queueDepth: 0 };
serverCapabilities: string[] = [];
@ -56,7 +50,6 @@ export default class SurfSensePlugin extends Plugin {
await this.saveSettings();
this.settingTab?.renderStatus();
},
getDeviceId: () => this.deviceId,
setStatus: (s) => {
this.lastStatus = s;
this.statusBar?.update(s);
@ -174,28 +167,11 @@ export default class SurfSensePlugin extends Plugin {
await this.saveData(this.settings);
}
/**
* Mint vault_id (in data.json, travels with the vault) and device_id
* (in `app.saveLocalStorage`, stays per-install) on first run.
*/
/** Mint vault_id (in data.json, travels with the vault) on first run. */
private seedIdentity(): void {
if (!this.settings.vaultId) {
this.settings.vaultId = generateUuid();
}
// loadLocalStorage / saveLocalStorage aren't in the d.ts; cast at the boundary.
const localStore = this.app as unknown as {
loadLocalStorage: (key: string) => string | null;
saveLocalStorage: (key: string, value: string | null) => void;
};
const storageKey = "surfsense:deviceId";
let deviceId = localStore.loadLocalStorage(storageKey);
if (!deviceId) {
deviceId = generateUuid();
localStore.saveLocalStorage(storageKey, deviceId);
}
this.deviceId = deviceId;
if (!this.settings.vaultName) {
this.settings.vaultName = this.app.vault.getName();
}

View file

@ -4,6 +4,7 @@ import {
PermanentError,
type SurfSenseApiClient,
TransientError,
VaultNotRegisteredError,
} from "./api-client";
import { isExcluded } from "./excludes";
import { buildNotePayload, computeContentHash } from "./payload";
@ -29,8 +30,6 @@ export interface SyncEngineDeps {
queue: PersistentQueue;
getSettings: () => SyncEngineSettings;
saveSettings: (mut: (s: SyncEngineSettings) => void) => Promise<void>;
/** Per-install id sourced from app.saveLocalStorage (not synced data.json). */
getDeviceId: () => string;
setStatus: (s: StatusState) => void;
onCapabilities: (caps: string[], apiVersion: string) => void;
}
@ -89,12 +88,11 @@ export class SyncEngine {
return;
}
// Re-announce on every load: /connect doubles as the device heartbeat
// that bumps last_seen_at and powers the "Devices: N" tile in the web UI.
// Re-announce so the backend sees the latest vault_name + last_connect_at.
// flushQueue owns the connectorId gate, so a failed connect here still
// leaves the queue stable for the next trigger.
await this.ensureConnected();
if (!this.deps.getSettings().connectorId) return;
await this.flushQueue();
await this.maybeReconcile();
this.setStatus(this.queueStatusKind(), undefined);
@ -112,7 +110,6 @@ export class SyncEngine {
searchSpaceId: settings.searchSpaceId,
vaultId: settings.vaultId,
vaultName: settings.vaultName,
deviceId: this.deps.getDeviceId(),
});
this.applyHealth(resp);
await this.deps.saveSettings((s) => {
@ -205,6 +202,11 @@ export class SyncEngine {
async flushQueue(): Promise<void> {
if (this.deps.queue.size === 0) return;
// Shared gate for every flush trigger so the first /sync can't race /connect.
if (!this.deps.getSettings().connectorId) {
await this.ensureConnected();
if (!this.deps.getSettings().connectorId) return;
}
this.setStatus("syncing", `Syncing ${this.deps.queue.size} item(s)…`);
const summary = await this.deps.queue.drain({
processBatch: (batch) => this.processBatch(batch),
@ -237,10 +239,14 @@ export class SyncEngine {
});
acked.push(...renames);
} catch (err) {
const verdict = this.classify(err);
if (verdict === "stop") return { acked, retry: [...retry, ...renames], dropped, stop: true };
if (verdict === "retry") retry.push(...renames);
else dropped.push(...renames);
if (await this.handleVaultNotRegistered(err)) {
retry.push(...renames);
} else {
const verdict = this.classify(err);
if (verdict === "stop") return { acked, retry: [...retry, ...renames], dropped, stop: true };
if (verdict === "retry") retry.push(...renames);
else dropped.push(...renames);
}
}
}
@ -252,10 +258,14 @@ export class SyncEngine {
});
acked.push(...deletes);
} catch (err) {
const verdict = this.classify(err);
if (verdict === "stop") return { acked, retry: [...retry, ...deletes], dropped, stop: true };
if (verdict === "retry") retry.push(...deletes);
else dropped.push(...deletes);
if (await this.handleVaultNotRegistered(err)) {
retry.push(...deletes);
} else {
const verdict = this.classify(err);
if (verdict === "stop") return { acked, retry: [...retry, ...deletes], dropped, stop: true };
if (verdict === "retry") retry.push(...deletes);
else dropped.push(...deletes);
}
}
}
@ -292,11 +302,18 @@ export class SyncEngine {
else acked.push(item);
}
} catch (err) {
const verdict = this.classify(err);
if (verdict === "stop")
return { acked, retry: [...retry, ...upserts], dropped, stop: true };
if (verdict === "retry") retry.push(...upserts);
else dropped.push(...upserts);
if (await this.handleVaultNotRegistered(err)) {
for (const item of upserts) {
if (retry.find((r) => r === item)) continue;
retry.push(item);
}
} else {
const verdict = this.classify(err);
if (verdict === "stop")
return { acked, retry: [...retry, ...upserts], dropped, stop: true };
if (verdict === "retry") retry.push(...upserts);
else dropped.push(...upserts);
}
}
}
}
@ -427,6 +444,14 @@ export class SyncEngine {
this.setStatus("error", (err as Error).message ?? "Unknown error");
}
/** Re-connect on VAULT_NOT_REGISTERED so the next drain sees the new row. */
private async handleVaultNotRegistered(err: unknown): Promise<boolean> {
if (!(err instanceof VaultNotRegisteredError)) return false;
console.warn("SurfSense: vault not registered, re-connecting before retry", err);
await this.ensureConnected();
return true;
}
private classify(err: unknown): "ack" | "retry" | "drop" | "stop" {
if (err instanceof AuthError) {
this.setStatus("auth-error", err.message);

View file

@ -8,8 +8,6 @@ export interface SurfsensePluginSettings {
/** UUID for the vault — lives here so Obsidian Sync replicates it across devices. */
vaultId: string;
vaultName: string;
// Per-install deviceId is NOT in this interface on purpose: it lives in
// app.saveLocalStorage so it stays distinct on each device. See seedIdentity().
syncMode: "auto" | "manual";
excludePatterns: string[];
includeAttachments: boolean;

View file

@ -1,9 +1,10 @@
"use client";
import { AlertTriangle, Download, Info } from "lucide-react";
import { type FC, useMemo } from "react";
import { type FC, useEffect, useMemo, useState } from "react";
import { Alert, AlertDescription, AlertTitle } from "@/components/ui/alert";
import { Button } from "@/components/ui/button";
import { connectorsApiService, type ObsidianStats } from "@/lib/apis/connectors-api.service";
import type { ConnectorConfigProps } from "../index";
const PLUGIN_RELEASES_URL =
@ -98,29 +99,49 @@ const LegacyBanner: FC = () => {
};
const PluginStats: FC<{ config: Record<string, unknown> }> = ({ config }) => {
const stats: { label: string; value: string }[] = useMemo(() => {
const filesSynced = config.files_synced;
// Derive from config.devices — a stored counter could drift under concurrent heartbeats.
const deviceCount =
config.devices && typeof config.devices === "object"
? Object.keys(config.devices as Record<string, unknown>).length
: null;
const vaultId = typeof config.vault_id === "string" ? config.vault_id : null;
const [stats, setStats] = useState<ObsidianStats | null>(null);
const [statsError, setStatsError] = useState(false);
useEffect(() => {
if (!vaultId) return;
let cancelled = false;
setStats(null);
setStatsError(false);
connectorsApiService
.getObsidianStats(vaultId)
.then((result) => {
if (!cancelled) setStats(result);
})
.catch((err) => {
if (!cancelled) {
console.error("Failed to fetch Obsidian stats", err);
setStatsError(true);
}
});
return () => {
cancelled = true;
};
}, [vaultId]);
const tileRows = useMemo(() => {
const placeholder = statsError ? "—" : stats ? null : "…";
return [
{ label: "Vault", value: (config.vault_name as string) || "—" },
{
label: "Devices",
value: deviceCount !== null ? deviceCount.toLocaleString() : "—",
},
{
label: "Last sync",
value: formatTimestamp(config.last_sync_at),
value: placeholder ?? formatTimestamp(stats?.last_sync_at ?? null),
},
{
label: "Files synced",
value: typeof filesSynced === "number" ? filesSynced.toLocaleString() : "—",
value:
placeholder ??
(typeof stats?.files_synced === "number"
? stats.files_synced.toLocaleString()
: "—"),
},
];
}, [config]);
}, [config.vault_name, stats, statsError]);
return (
<div className="space-y-4">
@ -136,7 +157,7 @@ const PluginStats: FC<{ config: Record<string, unknown> }> = ({ config }) => {
<div className="rounded-xl border border-border bg-slate-400/5 p-3 sm:p-6 dark:bg-white/5">
<h3 className="mb-3 text-sm font-medium sm:text-base">Vault status</h3>
<dl className="grid grid-cols-1 gap-3 sm:grid-cols-2">
{stats.map((stat) => (
{tileRows.map((stat) => (
<div
key={stat.label}
className="rounded-lg border border-slate-400/20 bg-background/50 p-3"

View file

@ -443,6 +443,19 @@ class ConnectorsApiService {
body: JSON.stringify({ tool_name: toolName }),
});
};
/** Live stats for the Obsidian connector tile. */
getObsidianStats = async (vaultId: string): Promise<ObsidianStats> => {
return baseApiService.get<ObsidianStats>(
`/api/v1/obsidian/stats?vault_id=${encodeURIComponent(vaultId)}`
);
};
}
export interface ObsidianStats {
vault_id: string;
files_synced: number;
last_sync_at: string | null;
}
export type { SlackChannel, DiscordChannel };