mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-08 20:25:19 +02:00
feat: implement cross-device deduplication for Obsidian connectors using vault fingerprinting and enhance connector management
This commit is contained in:
parent
2d90ed0fec
commit
54ce2666f5
10 changed files with 486 additions and 92 deletions
|
|
@ -1,14 +1,18 @@
|
|||
"""129_deactivate_legacy_obsidian_connectors
|
||||
"""129_obsidian_plugin_vault_identity
|
||||
|
||||
Revision ID: 129
|
||||
Revises: 128
|
||||
Create Date: 2026-04-18
|
||||
Create Date: 2026-04-21
|
||||
|
||||
Deactivates pre-plugin OBSIDIAN_CONNECTOR rows (keeping them and their
|
||||
Documents, but flagging ``config.legacy = true`` and disabling scheduling)
|
||||
and creates the partial unique index on
|
||||
``(user_id, (config->>'vault_id'))`` for plugin-Obsidian rows that backs
|
||||
the ``/obsidian/connect`` upsert.
|
||||
Locks down vault identity for the Obsidian plugin connector:
|
||||
|
||||
- Deactivates pre-plugin OBSIDIAN_CONNECTOR rows.
|
||||
- Partial unique index on ``(user_id, (config->>'vault_id'))`` for the
|
||||
``/obsidian/connect`` upsert fast path.
|
||||
- Partial unique index on ``(user_id, (config->>'vault_fingerprint'))``
|
||||
so two devices observing the same vault content can never produce
|
||||
two connector rows. Collisions are caught by the route handler and
|
||||
routed through the merge path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
|
@ -27,6 +31,7 @@ depends_on: str | Sequence[str] | None = None
|
|||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
|
||||
conn.execute(
|
||||
sa.text(
|
||||
"""
|
||||
|
|
@ -61,9 +66,27 @@ def upgrade() -> None:
|
|||
)
|
||||
)
|
||||
|
||||
conn.execute(
|
||||
sa.text(
|
||||
"""
|
||||
CREATE UNIQUE INDEX search_source_connectors_obsidian_plugin_fingerprint_uniq
|
||||
ON search_source_connectors (user_id, ((config->>'vault_fingerprint')))
|
||||
WHERE connector_type = 'OBSIDIAN_CONNECTOR'
|
||||
AND config->>'source' = 'plugin'
|
||||
AND config->>'vault_fingerprint' IS NOT NULL
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
conn.execute(
|
||||
sa.text(
|
||||
"DROP INDEX IF EXISTS "
|
||||
"search_source_connectors_obsidian_plugin_fingerprint_uniq"
|
||||
)
|
||||
)
|
||||
conn.execute(
|
||||
sa.text(
|
||||
"DROP INDEX IF EXISTS "
|
||||
|
|
@ -1522,6 +1522,19 @@ class SearchSourceConnector(BaseModel, TimestampMixin):
|
|||
"AND config->>'vault_id' IS NOT NULL"
|
||||
),
|
||||
),
|
||||
# Cross-device dedup: same vault content from different devices
|
||||
# cannot produce two connector rows.
|
||||
Index(
|
||||
"search_source_connectors_obsidian_plugin_fingerprint_uniq",
|
||||
"user_id",
|
||||
text("(config->>'vault_fingerprint')"),
|
||||
unique=True,
|
||||
postgresql_where=text(
|
||||
"connector_type = 'OBSIDIAN_CONNECTOR' "
|
||||
"AND config->>'source' = 'plugin' "
|
||||
"AND config->>'vault_fingerprint' IS NOT NULL"
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
name = Column(String(100), nullable=False, index=True)
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ from __future__ import annotations
|
|||
import logging
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import sqlalchemy as sa
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||
from sqlalchemy import and_, case, func
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
|
|
@ -45,6 +44,7 @@ from app.schemas.obsidian_plugin import (
|
|||
from app.services.obsidian_plugin_indexer import (
|
||||
delete_note,
|
||||
get_manifest,
|
||||
merge_obsidian_connectors,
|
||||
rename_note,
|
||||
upsert_note,
|
||||
)
|
||||
|
|
@ -144,33 +144,139 @@ async def obsidian_health(
|
|||
)
|
||||
|
||||
|
||||
async def _find_by_vault_id(
|
||||
session: AsyncSession, *, user_id, vault_id: str
|
||||
) -> SearchSourceConnector | None:
|
||||
stmt = select(SearchSourceConnector).where(
|
||||
and_(
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
|
||||
SearchSourceConnector.config["source"].as_string() == "plugin",
|
||||
SearchSourceConnector.config["vault_id"].as_string() == vault_id,
|
||||
)
|
||||
)
|
||||
return (await session.execute(stmt)).scalars().first()
|
||||
|
||||
|
||||
async def _find_by_fingerprint(
|
||||
session: AsyncSession, *, user_id, vault_fingerprint: str
|
||||
) -> SearchSourceConnector | None:
|
||||
stmt = select(SearchSourceConnector).where(
|
||||
and_(
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
|
||||
SearchSourceConnector.config["source"].as_string() == "plugin",
|
||||
SearchSourceConnector.config["vault_fingerprint"].as_string()
|
||||
== vault_fingerprint,
|
||||
)
|
||||
)
|
||||
return (await session.execute(stmt)).scalars().first()
|
||||
|
||||
|
||||
def _build_config(
|
||||
payload: ConnectRequest, *, now_iso: str
|
||||
) -> dict[str, object]:
|
||||
return {
|
||||
"vault_id": payload.vault_id,
|
||||
"vault_name": payload.vault_name,
|
||||
"vault_fingerprint": payload.vault_fingerprint,
|
||||
"source": "plugin",
|
||||
"last_connect_at": now_iso,
|
||||
}
|
||||
|
||||
|
||||
def _display_name(vault_name: str) -> str:
|
||||
return f"Obsidian \u2014 {vault_name}"
|
||||
|
||||
|
||||
@router.post("/connect", response_model=ConnectResponse)
|
||||
async def obsidian_connect(
|
||||
payload: ConnectRequest,
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
) -> ConnectResponse:
|
||||
"""Register a vault, or refresh the existing connector row.
|
||||
"""Register a vault, refresh an existing one, or adopt another device's row.
|
||||
|
||||
Idempotent on ``(user_id, OBSIDIAN_CONNECTOR, vault_id)`` via the partial
|
||||
unique index from migration 129. Called on every plugin onload.
|
||||
Resolution order:
|
||||
1. ``(user_id, vault_id)`` → known device, refresh metadata.
|
||||
2. ``(user_id, vault_fingerprint)`` → another device of the same vault,
|
||||
caller adopts the surviving ``vault_id``.
|
||||
3. Insert a new row.
|
||||
|
||||
Fingerprint collisions on (1) trigger ``merge_obsidian_connectors`` so
|
||||
the partial unique index can never produce two live rows for one vault.
|
||||
"""
|
||||
await _ensure_search_space_access(
|
||||
session, user=user, search_space_id=payload.search_space_id
|
||||
)
|
||||
|
||||
now_iso = datetime.now(UTC).isoformat()
|
||||
cfg = {
|
||||
"vault_id": payload.vault_id,
|
||||
"vault_name": payload.vault_name,
|
||||
"source": "plugin",
|
||||
"last_connect_at": now_iso,
|
||||
}
|
||||
display_name = f"Obsidian \u2014 {payload.vault_name}"
|
||||
cfg = _build_config(payload, now_iso=now_iso)
|
||||
display_name = _display_name(payload.vault_name)
|
||||
|
||||
# ``index_elements`` + ``index_where`` matches the partial unique index
|
||||
# by shape; ``ON CONFLICT ON CONSTRAINT`` doesn't work for partial indexes.
|
||||
stmt = (
|
||||
existing_by_vid = await _find_by_vault_id(
|
||||
session, user_id=user.id, vault_id=payload.vault_id
|
||||
)
|
||||
if existing_by_vid is not None:
|
||||
collision = await _find_by_fingerprint(
|
||||
session, user_id=user.id, vault_fingerprint=payload.vault_fingerprint
|
||||
)
|
||||
if collision is not None and collision.id != existing_by_vid.id:
|
||||
await merge_obsidian_connectors(
|
||||
session, source=existing_by_vid, target=collision
|
||||
)
|
||||
collision_cfg = dict(collision.config or {})
|
||||
collision_cfg["vault_name"] = payload.vault_name
|
||||
collision_cfg["last_connect_at"] = now_iso
|
||||
collision.config = collision_cfg
|
||||
collision.name = _display_name(payload.vault_name)
|
||||
response = ConnectResponse(
|
||||
connector_id=collision.id,
|
||||
vault_id=collision_cfg["vault_id"],
|
||||
search_space_id=collision.search_space_id,
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
existing_by_vid.name = display_name
|
||||
existing_by_vid.config = cfg
|
||||
existing_by_vid.search_space_id = payload.search_space_id
|
||||
existing_by_vid.is_indexable = False
|
||||
response = ConnectResponse(
|
||||
connector_id=existing_by_vid.id,
|
||||
vault_id=payload.vault_id,
|
||||
search_space_id=existing_by_vid.search_space_id,
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
existing_by_fp = await _find_by_fingerprint(
|
||||
session, user_id=user.id, vault_fingerprint=payload.vault_fingerprint
|
||||
)
|
||||
if existing_by_fp is not None:
|
||||
survivor_cfg = dict(existing_by_fp.config or {})
|
||||
survivor_cfg["vault_name"] = payload.vault_name
|
||||
survivor_cfg["last_connect_at"] = now_iso
|
||||
existing_by_fp.config = survivor_cfg
|
||||
existing_by_fp.name = display_name
|
||||
response = ConnectResponse(
|
||||
connector_id=existing_by_fp.id,
|
||||
vault_id=survivor_cfg["vault_id"],
|
||||
search_space_id=existing_by_fp.search_space_id,
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
# ON CONFLICT DO NOTHING matches any unique index (vault_id OR
|
||||
# fingerprint), so concurrent first-time connects from two devices
|
||||
# of the same vault never raise IntegrityError — the loser just
|
||||
# gets an empty RETURNING and falls through to re-fetch the winner.
|
||||
insert_stmt = (
|
||||
pg_insert(SearchSourceConnector)
|
||||
.values(
|
||||
name=display_name,
|
||||
|
|
@ -180,40 +286,43 @@ async def obsidian_connect(
|
|||
user_id=user.id,
|
||||
search_space_id=payload.search_space_id,
|
||||
)
|
||||
.on_conflict_do_update(
|
||||
index_elements=[
|
||||
SearchSourceConnector.user_id,
|
||||
sa.text("(config->>'vault_id')"),
|
||||
],
|
||||
index_where=sa.text(
|
||||
"connector_type = 'OBSIDIAN_CONNECTOR' "
|
||||
"AND config->>'source' = 'plugin' "
|
||||
"AND config->>'vault_id' IS NOT NULL"
|
||||
),
|
||||
set_={
|
||||
"name": display_name,
|
||||
"config": cfg,
|
||||
"search_space_id": payload.search_space_id,
|
||||
"is_indexable": False,
|
||||
},
|
||||
.on_conflict_do_nothing()
|
||||
.returning(
|
||||
SearchSourceConnector.id,
|
||||
SearchSourceConnector.search_space_id,
|
||||
)
|
||||
.returning(SearchSourceConnector)
|
||||
)
|
||||
inserted = (await session.execute(insert_stmt)).first()
|
||||
if inserted is not None:
|
||||
response = ConnectResponse(
|
||||
connector_id=inserted.id,
|
||||
vault_id=payload.vault_id,
|
||||
search_space_id=inserted.search_space_id,
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
result = await session.execute(stmt)
|
||||
connector = result.scalar_one()
|
||||
# Read attrs before commit; ``expire_on_commit=True`` would force a
|
||||
# lazy refresh that fails with ``MissingGreenlet`` during serialization.
|
||||
connector_id = connector.id
|
||||
connector_search_space_id = connector.search_space_id
|
||||
await session.commit()
|
||||
|
||||
return ConnectResponse(
|
||||
connector_id=connector_id,
|
||||
vault_id=payload.vault_id,
|
||||
search_space_id=connector_search_space_id,
|
||||
winner = await _find_by_fingerprint(
|
||||
session, user_id=user.id, vault_fingerprint=payload.vault_fingerprint
|
||||
)
|
||||
if winner is None:
|
||||
winner = await _find_by_vault_id(
|
||||
session, user_id=user.id, vault_id=payload.vault_id
|
||||
)
|
||||
if winner is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail="vault registration conflicted but winning row could not be located",
|
||||
)
|
||||
response = ConnectResponse(
|
||||
connector_id=winner.id,
|
||||
vault_id=(winner.config or {})["vault_id"],
|
||||
search_space_id=winner.search_space_id,
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
|
||||
@router.post("/sync", response_model=SyncAck)
|
||||
|
|
|
|||
|
|
@ -92,6 +92,14 @@ class ConnectRequest(_PluginBase):
|
|||
vault_id: str
|
||||
vault_name: str
|
||||
search_space_id: int
|
||||
vault_fingerprint: str = Field(
|
||||
...,
|
||||
description=(
|
||||
"Deterministic SHA-256 over the sorted markdown paths in the vault "
|
||||
"(plus vault_name). Same vault content on any device produces the "
|
||||
"same value; the server uses it to dedup connectors across devices."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class ConnectResponse(_PluginBase):
|
||||
|
|
|
|||
|
|
@ -355,6 +355,78 @@ async def delete_note(
|
|||
return True
|
||||
|
||||
|
||||
async def merge_obsidian_connectors(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
source: SearchSourceConnector,
|
||||
target: SearchSourceConnector,
|
||||
) -> None:
|
||||
"""Fold ``source``'s documents into ``target`` and delete ``source``.
|
||||
|
||||
Triggered when the fingerprint dedup detects two plugin connectors
|
||||
pointing at the same vault (e.g. a mobile install raced with iCloud
|
||||
hydration and got a partial fingerprint, then caught up). Path
|
||||
collisions resolve in favour of ``target`` (the surviving row);
|
||||
``source``'s duplicate documents are hard-deleted along with their
|
||||
chunks via the ``cascade='all, delete-orphan'`` on ``Document.chunks``.
|
||||
"""
|
||||
if source.id == target.id:
|
||||
return
|
||||
|
||||
target_vault_id = (target.config or {}).get("vault_id")
|
||||
target_search_space_id = target.search_space_id
|
||||
if not target_vault_id:
|
||||
raise RuntimeError("merge target is missing vault_id")
|
||||
|
||||
target_paths_result = await session.execute(
|
||||
select(Document).where(
|
||||
and_(
|
||||
Document.connector_id == target.id,
|
||||
Document.document_type == DocumentType.OBSIDIAN_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
target_paths: set[str] = set()
|
||||
for doc in target_paths_result.scalars().all():
|
||||
meta = doc.document_metadata or {}
|
||||
path = meta.get("file_path")
|
||||
if path:
|
||||
target_paths.add(path)
|
||||
|
||||
source_docs_result = await session.execute(
|
||||
select(Document).where(
|
||||
and_(
|
||||
Document.connector_id == source.id,
|
||||
Document.document_type == DocumentType.OBSIDIAN_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
for doc in source_docs_result.scalars().all():
|
||||
meta = dict(doc.document_metadata or {})
|
||||
path = meta.get("file_path")
|
||||
if not path or path in target_paths:
|
||||
await session.delete(doc)
|
||||
continue
|
||||
|
||||
new_unique_id = _vault_path_unique_id(target_vault_id, path)
|
||||
new_uid_hash = generate_unique_identifier_hash(
|
||||
DocumentType.OBSIDIAN_CONNECTOR,
|
||||
new_unique_id,
|
||||
target_search_space_id,
|
||||
)
|
||||
meta["vault_id"] = target_vault_id
|
||||
meta["connector_id"] = target.id
|
||||
doc.document_metadata = meta
|
||||
doc.connector_id = target.id
|
||||
doc.search_space_id = target_search_space_id
|
||||
doc.unique_identifier_hash = new_uid_hash
|
||||
target_paths.add(path)
|
||||
|
||||
await session.flush()
|
||||
await session.delete(source)
|
||||
|
||||
|
||||
async def get_manifest(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
|
|
|
|||
|
|
@ -1,15 +1,18 @@
|
|||
"""Integration tests for the Obsidian plugin HTTP wire contract.
|
||||
|
||||
Two concerns:
|
||||
Three concerns:
|
||||
|
||||
1. The /connect upsert really collapses concurrent first-time connects to
|
||||
exactly one row. This locks the partial unique index in migration 129
|
||||
exactly one row. This locks the partial unique index from migration 129
|
||||
to its purpose.
|
||||
2. The end-to-end response shapes returned by /connect /sync /rename
|
||||
2. The fingerprint dedup path: a second device connecting with a fresh
|
||||
``vault_id`` but the same ``vault_fingerprint`` adopts the existing
|
||||
connector instead of creating a duplicate.
|
||||
3. The end-to-end response shapes returned by /connect /sync /rename
|
||||
/notes /manifest /stats match the schemas the plugin's TypeScript
|
||||
decoders expect. Each new field renamed (results -> items, accepted ->
|
||||
indexed, etc.) is a contract change, and a smoke pass like this is
|
||||
the cheapest way to catch a future drift before it ships.
|
||||
decoders expect. Each renamed field is a contract change, and a smoke
|
||||
pass like this is the cheapest way to catch a future drift before it
|
||||
ships.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
|
@ -26,8 +29,6 @@ from sqlalchemy.exc import IntegrityError
|
|||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
SearchSpace,
|
||||
|
|
@ -135,12 +136,14 @@ class TestConnectRace:
|
|||
async def test_concurrent_first_connects_collapse_to_one_row(
|
||||
self, async_engine, race_user_and_space
|
||||
):
|
||||
"""Two simultaneous /connect calls for the same vault_id should
|
||||
produce exactly one row, not two. This relies on the partial
|
||||
unique index added in migration 129 plus the
|
||||
ON CONFLICT DO UPDATE in obsidian_connect."""
|
||||
"""Two simultaneous /connect calls for the same vault should
|
||||
produce exactly one row, not two. Same vault_id + same
|
||||
fingerprint funnels through both partial unique indexes; the
|
||||
loser falls back to the survivor row via the IntegrityError
|
||||
branch in obsidian_connect."""
|
||||
user_id, space_id = race_user_and_space
|
||||
vault_id = str(uuid.uuid4())
|
||||
fingerprint = "fp-" + uuid.uuid4().hex
|
||||
|
||||
async def _call(name_suffix: str) -> None:
|
||||
async with AsyncSession(async_engine) as s:
|
||||
|
|
@ -149,21 +152,16 @@ class TestConnectRace:
|
|||
vault_id=vault_id,
|
||||
vault_name=f"My Vault {name_suffix}",
|
||||
search_space_id=space_id,
|
||||
vault_fingerprint=fingerprint,
|
||||
)
|
||||
await obsidian_connect(payload, user=fresh_user, session=s)
|
||||
|
||||
results = await asyncio.gather(
|
||||
_call("a"), _call("b"), return_exceptions=True
|
||||
)
|
||||
# Both calls should succeed (ON CONFLICT collapses, doesn't raise).
|
||||
for r in results:
|
||||
assert not isinstance(r, Exception), f"Connect raised: {r!r}"
|
||||
|
||||
# The fixture creates a fresh user per test, so a count scoped to
|
||||
# ``user_id`` is equivalent to "rows for this vault" without
|
||||
# needing the JSON-path filter (which only works against a real
|
||||
# postgres dialect at compile time, not against a bare model
|
||||
# expression in a test).
|
||||
async with AsyncSession(async_engine) as verify:
|
||||
count = (
|
||||
await verify.execute(
|
||||
|
|
@ -177,11 +175,8 @@ class TestConnectRace:
|
|||
async def test_partial_unique_index_blocks_raw_duplicate(
|
||||
self, async_engine, race_user_and_space
|
||||
):
|
||||
"""If the partial unique index were missing, two raw INSERTs of
|
||||
plugin-Obsidian rows for the same (user_id, vault_id) would both
|
||||
succeed. With the index in place the second one must raise
|
||||
IntegrityError. This guards the schema regardless of whether the
|
||||
route logic is correct."""
|
||||
"""Raw INSERTs that bypass the route must still be blocked by
|
||||
the partial unique indexes from migration 129."""
|
||||
user_id, space_id = race_user_and_space
|
||||
vault_id = str(uuid.uuid4())
|
||||
|
||||
|
|
@ -195,6 +190,7 @@ class TestConnectRace:
|
|||
"vault_id": vault_id,
|
||||
"vault_name": "First",
|
||||
"source": "plugin",
|
||||
"vault_fingerprint": "fp-1",
|
||||
},
|
||||
user_id=user_id,
|
||||
search_space_id=space_id,
|
||||
|
|
@ -213,6 +209,7 @@ class TestConnectRace:
|
|||
"vault_id": vault_id,
|
||||
"vault_name": "Second",
|
||||
"source": "plugin",
|
||||
"vault_fingerprint": "fp-2",
|
||||
},
|
||||
user_id=user_id,
|
||||
search_space_id=space_id,
|
||||
|
|
@ -220,6 +217,102 @@ class TestConnectRace:
|
|||
)
|
||||
await s.commit()
|
||||
|
||||
async def test_fingerprint_blocks_raw_cross_device_duplicate(
|
||||
self, async_engine, race_user_and_space
|
||||
):
|
||||
"""Two connectors for the same user with different vault_ids but
|
||||
the same fingerprint cannot coexist."""
|
||||
user_id, space_id = race_user_and_space
|
||||
fingerprint = "fp-" + uuid.uuid4().hex
|
||||
|
||||
async with AsyncSession(async_engine) as s:
|
||||
s.add(
|
||||
SearchSourceConnector(
|
||||
name="Obsidian \u2014 Desktop",
|
||||
connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
|
||||
is_indexable=False,
|
||||
config={
|
||||
"vault_id": str(uuid.uuid4()),
|
||||
"vault_name": "Vault",
|
||||
"source": "plugin",
|
||||
"vault_fingerprint": fingerprint,
|
||||
},
|
||||
user_id=user_id,
|
||||
search_space_id=space_id,
|
||||
)
|
||||
)
|
||||
await s.commit()
|
||||
|
||||
with pytest.raises(IntegrityError):
|
||||
async with AsyncSession(async_engine) as s:
|
||||
s.add(
|
||||
SearchSourceConnector(
|
||||
name="Obsidian \u2014 Mobile",
|
||||
connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
|
||||
is_indexable=False,
|
||||
config={
|
||||
"vault_id": str(uuid.uuid4()),
|
||||
"vault_name": "Vault",
|
||||
"source": "plugin",
|
||||
"vault_fingerprint": fingerprint,
|
||||
},
|
||||
user_id=user_id,
|
||||
search_space_id=space_id,
|
||||
)
|
||||
)
|
||||
await s.commit()
|
||||
|
||||
async def test_second_device_adopts_existing_connector_via_fingerprint(
|
||||
self, async_engine, race_user_and_space
|
||||
):
|
||||
"""Device A connects with vault_id=A. Device B then connects with
|
||||
a fresh vault_id=B but the same fingerprint. The route must
|
||||
return A's identity (not create a B row), proving cross-device
|
||||
dedup happens transparently to the plugin."""
|
||||
user_id, space_id = race_user_and_space
|
||||
vault_id_a = str(uuid.uuid4())
|
||||
vault_id_b = str(uuid.uuid4())
|
||||
fingerprint = "fp-" + uuid.uuid4().hex
|
||||
|
||||
async with AsyncSession(async_engine) as s:
|
||||
fresh_user = await s.get(User, user_id)
|
||||
resp_a = await obsidian_connect(
|
||||
ConnectRequest(
|
||||
vault_id=vault_id_a,
|
||||
vault_name="Shared Vault",
|
||||
search_space_id=space_id,
|
||||
vault_fingerprint=fingerprint,
|
||||
),
|
||||
user=fresh_user,
|
||||
session=s,
|
||||
)
|
||||
|
||||
async with AsyncSession(async_engine) as s:
|
||||
fresh_user = await s.get(User, user_id)
|
||||
resp_b = await obsidian_connect(
|
||||
ConnectRequest(
|
||||
vault_id=vault_id_b,
|
||||
vault_name="Shared Vault",
|
||||
search_space_id=space_id,
|
||||
vault_fingerprint=fingerprint,
|
||||
),
|
||||
user=fresh_user,
|
||||
session=s,
|
||||
)
|
||||
|
||||
assert resp_b.vault_id == vault_id_a
|
||||
assert resp_b.connector_id == resp_a.connector_id
|
||||
|
||||
async with AsyncSession(async_engine) as verify:
|
||||
count = (
|
||||
await verify.execute(
|
||||
select(func.count(SearchSourceConnector.id)).where(
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
)
|
||||
)
|
||||
).scalar_one()
|
||||
assert count == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Combined wire-shape smoke test
|
||||
|
|
@ -244,6 +337,7 @@ class TestWireContractSmoke:
|
|||
vault_id=vault_id,
|
||||
vault_name="Smoke Vault",
|
||||
search_space_id=db_search_space.id,
|
||||
vault_fingerprint="fp-" + uuid.uuid4().hex,
|
||||
),
|
||||
user=db_user,
|
||||
session=db_session,
|
||||
|
|
|
|||
|
|
@ -110,6 +110,7 @@ export class SurfSenseApiClient {
|
|||
searchSpaceId: number;
|
||||
vaultId: string;
|
||||
vaultName: string;
|
||||
vaultFingerprint: string;
|
||||
}): Promise<ConnectResponse> {
|
||||
return await this.request<ConnectResponse>(
|
||||
"POST",
|
||||
|
|
@ -118,6 +119,7 @@ export class SurfSenseApiClient {
|
|||
vault_id: input.vaultId,
|
||||
vault_name: input.vaultName,
|
||||
search_space_id: input.searchSpaceId,
|
||||
vault_fingerprint: input.vaultFingerprint,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import {
|
|||
type StatusState,
|
||||
type SurfsensePluginSettings,
|
||||
} from "./types";
|
||||
import { generateVaultUuid } from "./vault-identity";
|
||||
|
||||
/** SurfSense plugin entry point. */
|
||||
export default class SurfSensePlugin extends Plugin {
|
||||
|
|
@ -167,6 +168,25 @@ export default class SurfSensePlugin extends Plugin {
|
|||
this.queue?.requestStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Obsidian fires this when another device rewrites our data.json.
|
||||
* If the synced vault_id differs from ours, adopt it and
|
||||
* re-handshake so the server routes us to the right row.
|
||||
*/
|
||||
async onExternalSettingsChange(): Promise<void> {
|
||||
const previousVaultId = this.settings.vaultId;
|
||||
const previousConnectorId = this.settings.connectorId;
|
||||
await this.loadSettings();
|
||||
const changed =
|
||||
this.settings.vaultId !== previousVaultId ||
|
||||
this.settings.connectorId !== previousConnectorId;
|
||||
if (!changed) return;
|
||||
this.notifyStatusChange();
|
||||
if (this.settings.searchSpaceId !== null) {
|
||||
void this.engine.ensureConnected();
|
||||
}
|
||||
}
|
||||
|
||||
get queueDepth(): number {
|
||||
return this.queue?.size ?? 0;
|
||||
}
|
||||
|
|
@ -238,10 +258,15 @@ export default class SurfSensePlugin extends Plugin {
|
|||
await this.saveData(this.settings);
|
||||
}
|
||||
|
||||
/** Mint vault_id (in data.json, travels with the vault) on first run. */
|
||||
/**
|
||||
* Mint a tentative vault_id locally on first run. The server's
|
||||
* fingerprint dedup (see /obsidian/connect) may overwrite it on the
|
||||
* first /connect when another device of the same vault has already
|
||||
* registered; we always trust the server's response.
|
||||
*/
|
||||
private seedIdentity(): void {
|
||||
if (!this.settings.vaultId) {
|
||||
this.settings.vaultId = generateUuid();
|
||||
this.settings.vaultId = generateVaultUuid();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -252,17 +277,3 @@ interface NetworkConnection {
|
|||
addEventListener?: (event: string, handler: () => void) => void;
|
||||
removeEventListener?: (event: string, handler: () => void) => void;
|
||||
}
|
||||
|
||||
function generateUuid(): string {
|
||||
const c = globalThis.crypto;
|
||||
if (c?.randomUUID) return c.randomUUID();
|
||||
const buf = new Uint8Array(16);
|
||||
c.getRandomValues(buf);
|
||||
buf[6] = ((buf[6] ?? 0) & 0x0f) | 0x40;
|
||||
buf[8] = ((buf[8] ?? 0) & 0x3f) | 0x80;
|
||||
const hex = Array.from(buf, (b) => b.toString(16).padStart(2, "0")).join("");
|
||||
return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(
|
||||
16,
|
||||
20,
|
||||
)}-${hex.slice(20)}`;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import type {
|
|||
StatusKind,
|
||||
StatusState,
|
||||
} from "./types";
|
||||
import { computeVaultFingerprint } from "./vault-identity";
|
||||
|
||||
/**
|
||||
* Owner of "what does the vault look like vs the server" reasoning.
|
||||
|
|
@ -110,7 +111,14 @@ export class SyncEngine {
|
|||
this.setStatus(this.queueStatusKind(), undefined);
|
||||
}
|
||||
|
||||
/** Public entry point used after settings save to (re)connect the vault. */
|
||||
/**
|
||||
* (Re)register the vault with the server.
|
||||
*
|
||||
* Always trusts the server's response: when fingerprint dedup routes
|
||||
* us to another device's connector, ``resp.vault_id`` may differ from
|
||||
* what we sent and we adopt it locally so future /sync calls land on
|
||||
* the right row.
|
||||
*/
|
||||
async ensureConnected(): Promise<void> {
|
||||
const settings = this.deps.getSettings();
|
||||
if (!settings.searchSpaceId) {
|
||||
|
|
@ -118,13 +126,16 @@ export class SyncEngine {
|
|||
return;
|
||||
}
|
||||
try {
|
||||
const fingerprint = await computeVaultFingerprint(this.deps.app);
|
||||
const resp = await this.deps.apiClient.connect({
|
||||
searchSpaceId: settings.searchSpaceId,
|
||||
vaultId: settings.vaultId,
|
||||
vaultName: this.deps.app.vault.getName(),
|
||||
vaultFingerprint: fingerprint,
|
||||
});
|
||||
this.applyHealth(resp);
|
||||
await this.deps.saveSettings((s) => {
|
||||
s.vaultId = resp.vault_id;
|
||||
s.connectorId = resp.connector_id;
|
||||
});
|
||||
} catch (err) {
|
||||
|
|
@ -385,11 +396,19 @@ export class SyncEngine {
|
|||
if (Date.now() - settings.lastReconcileAt < RECONCILE_MIN_INTERVAL_MS) return;
|
||||
}
|
||||
|
||||
// Re-handshake first so the server sees this device's current
|
||||
// fingerprint. If the vault grew since last connect and now
|
||||
// matches another device's row, the server merges and routes us
|
||||
// to the survivor; subsequent /manifest call uses the adopted id.
|
||||
await this.ensureConnected();
|
||||
const refreshed = this.deps.getSettings();
|
||||
if (!refreshed.connectorId) return;
|
||||
|
||||
this.setStatus("syncing", "Reconciling vault with server…");
|
||||
try {
|
||||
const manifest = await this.deps.apiClient.getManifest(settings.vaultId);
|
||||
const manifest = await this.deps.apiClient.getManifest(refreshed.vaultId);
|
||||
const remote = manifest.items ?? {};
|
||||
const enqueued = this.diffAndQueue(settings, remote);
|
||||
const enqueued = this.diffAndQueue(refreshed, remote);
|
||||
await this.deps.saveSettings((s) => {
|
||||
s.lastReconcileAt = Date.now();
|
||||
s.tombstones = pruneTombstones(s.tombstones);
|
||||
|
|
|
|||
43
surfsense_obsidian/src/vault-identity.ts
Normal file
43
surfsense_obsidian/src/vault-identity.ts
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
import type { App } from "obsidian";
|
||||
|
||||
/**
|
||||
* Deterministic SHA-256 over the vault name + sorted markdown paths.
|
||||
*
|
||||
* Two devices observing the same vault content compute the same value,
|
||||
* regardless of how it was synced (iCloud, Syncthing, Obsidian Sync, …).
|
||||
* The server uses this as the cross-device dedup key on /connect.
|
||||
*/
|
||||
export async function computeVaultFingerprint(app: App): Promise<string> {
|
||||
const vaultName = app.vault.getName();
|
||||
const paths = app.vault
|
||||
.getMarkdownFiles()
|
||||
.map((f) => f.path)
|
||||
.sort();
|
||||
const payload = `${vaultName}\n${paths.join("\n")}`;
|
||||
const bytes = new TextEncoder().encode(payload);
|
||||
const digest = await crypto.subtle.digest("SHA-256", bytes);
|
||||
return bufferToHex(digest);
|
||||
}
|
||||
|
||||
function bufferToHex(buf: ArrayBuffer): string {
|
||||
const view = new Uint8Array(buf);
|
||||
let hex = "";
|
||||
for (let i = 0; i < view.length; i++) {
|
||||
hex += (view[i] ?? 0).toString(16).padStart(2, "0");
|
||||
}
|
||||
return hex;
|
||||
}
|
||||
|
||||
export function generateVaultUuid(): string {
|
||||
const c = globalThis.crypto;
|
||||
if (c?.randomUUID) return c.randomUUID();
|
||||
const buf = new Uint8Array(16);
|
||||
c.getRandomValues(buf);
|
||||
buf[6] = ((buf[6] ?? 0) & 0x0f) | 0x40;
|
||||
buf[8] = ((buf[8] ?? 0) & 0x3f) | 0x80;
|
||||
const hex = Array.from(buf, (b) => b.toString(16).padStart(2, "0")).join("");
|
||||
return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(
|
||||
16,
|
||||
20,
|
||||
)}-${hex.slice(20)}`;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue