mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-31 19:45:15 +02:00
Merge pull request #1286 from AnishSarkar22/feat/obsidian-plugin
feat: introduce Obsidian vault sync plugin
This commit is contained in:
commit
f607636ba6
83 changed files with 12540 additions and 1837 deletions
|
|
@ -90,6 +90,7 @@ celery_app = Celery(
|
|||
"app.tasks.celery_tasks.podcast_tasks",
|
||||
"app.tasks.celery_tasks.video_presentation_tasks",
|
||||
"app.tasks.celery_tasks.connector_tasks",
|
||||
"app.tasks.celery_tasks.obsidian_tasks",
|
||||
"app.tasks.celery_tasks.schedule_checker_task",
|
||||
"app.tasks.celery_tasks.document_reindex_tasks",
|
||||
"app.tasks.celery_tasks.stale_notification_cleanup_task",
|
||||
|
|
@ -144,8 +145,8 @@ celery_app.conf.update(
|
|||
"index_elasticsearch_documents": {"queue": CONNECTORS_QUEUE},
|
||||
"index_crawled_urls": {"queue": CONNECTORS_QUEUE},
|
||||
"index_bookstack_pages": {"queue": CONNECTORS_QUEUE},
|
||||
"index_obsidian_vault": {"queue": CONNECTORS_QUEUE},
|
||||
"index_composio_connector": {"queue": CONNECTORS_QUEUE},
|
||||
"index_obsidian_attachment": {"queue": CONNECTORS_QUEUE},
|
||||
# Everything else (document processing, podcasts, reindexing,
|
||||
# schedule checker, cleanup) stays on the default fast queue.
|
||||
},
|
||||
|
|
|
|||
|
|
@ -1510,6 +1510,31 @@ class SearchSourceConnector(BaseModel, TimestampMixin):
|
|||
"name",
|
||||
name="uq_searchspace_user_connector_type_name",
|
||||
),
|
||||
# Mirrors migration 129; backs the ``/obsidian/connect`` upsert.
|
||||
Index(
|
||||
"search_source_connectors_obsidian_plugin_vault_uniq",
|
||||
"user_id",
|
||||
text("(config->>'vault_id')"),
|
||||
unique=True,
|
||||
postgresql_where=text(
|
||||
"connector_type = 'OBSIDIAN_CONNECTOR' "
|
||||
"AND config->>'source' = 'plugin' "
|
||||
"AND config->>'vault_id' IS NOT NULL"
|
||||
),
|
||||
),
|
||||
# Cross-device dedup: same vault content from different devices
|
||||
# cannot produce two connector rows.
|
||||
Index(
|
||||
"search_source_connectors_obsidian_plugin_fingerprint_uniq",
|
||||
"user_id",
|
||||
text("(config->>'vault_fingerprint')"),
|
||||
unique=True,
|
||||
postgresql_where=text(
|
||||
"connector_type = 'OBSIDIAN_CONNECTOR' "
|
||||
"AND config->>'source' = 'plugin' "
|
||||
"AND config->>'vault_fingerprint' IS NOT NULL"
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
name = Column(String(100), nullable=False, index=True)
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ from .new_llm_config_routes import router as new_llm_config_router
|
|||
from .notes_routes import router as notes_router
|
||||
from .notifications_routes import router as notifications_router
|
||||
from .notion_add_connector_route import router as notion_add_connector_router
|
||||
from .obsidian_plugin_routes import router as obsidian_plugin_router
|
||||
from .onedrive_add_connector_route import router as onedrive_add_connector_router
|
||||
from .podcasts_routes import router as podcasts_router
|
||||
from .prompts_routes import router as prompts_router
|
||||
|
|
@ -85,6 +86,7 @@ router.include_router(notion_add_connector_router)
|
|||
router.include_router(slack_add_connector_router)
|
||||
router.include_router(teams_add_connector_router)
|
||||
router.include_router(onedrive_add_connector_router)
|
||||
router.include_router(obsidian_plugin_router) # Obsidian plugin push API
|
||||
router.include_router(discord_add_connector_router)
|
||||
router.include_router(jira_add_connector_router)
|
||||
router.include_router(confluence_add_connector_router)
|
||||
|
|
|
|||
706
surfsense_backend/app/routes/obsidian_plugin_routes.py
Normal file
706
surfsense_backend/app/routes/obsidian_plugin_routes.py
Normal file
|
|
@ -0,0 +1,706 @@
|
|||
"""Obsidian plugin ingestion routes (``/api/v1/obsidian/*``).
|
||||
|
||||
Wire surface for the ``surfsense_obsidian/`` plugin. Versioning anchor is
|
||||
the ``/api/v1/`` URL prefix; additive feature detection rides the
|
||||
``capabilities`` array on /health and /connect.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||
from sqlalchemy import and_, case, func
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
SearchSpace,
|
||||
User,
|
||||
get_async_session,
|
||||
)
|
||||
from app.schemas.obsidian_plugin import (
|
||||
ALLOWED_ATTACHMENT_EXTENSIONS,
|
||||
ATTACHMENT_MIME_TYPES,
|
||||
ConnectRequest,
|
||||
ConnectResponse,
|
||||
DeleteAck,
|
||||
DeleteAckItem,
|
||||
DeleteBatchRequest,
|
||||
HealthResponse,
|
||||
ManifestResponse,
|
||||
RenameAck,
|
||||
RenameAckItem,
|
||||
RenameBatchRequest,
|
||||
StatsResponse,
|
||||
SyncAck,
|
||||
SyncAckItem,
|
||||
SyncBatchRequest,
|
||||
)
|
||||
from app.services.notification_service import NotificationService
|
||||
from app.services.obsidian_plugin_indexer import (
|
||||
delete_note,
|
||||
get_manifest,
|
||||
merge_obsidian_connectors,
|
||||
rename_note,
|
||||
upsert_note,
|
||||
)
|
||||
from app.tasks.celery_tasks.obsidian_tasks import index_obsidian_attachment_task
|
||||
from app.users import current_active_user
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/obsidian", tags=["obsidian-plugin"])
|
||||
|
||||
|
||||
# Plugins feature-gate on these. Add entries, never rename or remove.
|
||||
OBSIDIAN_CAPABILITIES: list[str] = ["sync", "rename", "delete", "manifest", "stats"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_handshake() -> dict[str, object]:
|
||||
return {"capabilities": list(OBSIDIAN_CAPABILITIES)}
|
||||
|
||||
|
||||
def _connector_type_value(connector: SearchSourceConnector) -> str:
|
||||
connector_type = connector.connector_type
|
||||
if hasattr(connector_type, "value"):
|
||||
return str(connector_type.value)
|
||||
return str(connector_type)
|
||||
|
||||
|
||||
async def _start_obsidian_sync_notification(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
user: User,
|
||||
connector: SearchSourceConnector,
|
||||
total_count: int,
|
||||
):
|
||||
"""Create/update the rolling inbox item for Obsidian plugin sync.
|
||||
|
||||
Obsidian sync is continuous and batched, so we keep one stable
|
||||
operation_id per connector instead of creating a new notification per batch.
|
||||
"""
|
||||
handler = NotificationService.connector_indexing
|
||||
operation_id = f"obsidian_sync_connector_{connector.id}"
|
||||
connector_name = connector.name or "Obsidian"
|
||||
notification = await handler.find_or_create_notification(
|
||||
session=session,
|
||||
user_id=user.id,
|
||||
operation_id=operation_id,
|
||||
title=f"Syncing: {connector_name}",
|
||||
message="Syncing from Obsidian plugin",
|
||||
search_space_id=connector.search_space_id,
|
||||
initial_metadata={
|
||||
"connector_id": connector.id,
|
||||
"connector_name": connector_name,
|
||||
"connector_type": _connector_type_value(connector),
|
||||
"sync_stage": "processing",
|
||||
"indexed_count": 0,
|
||||
"failed_count": 0,
|
||||
"total_count": total_count,
|
||||
"source": "obsidian_plugin",
|
||||
},
|
||||
)
|
||||
return await handler.update_notification(
|
||||
session=session,
|
||||
notification=notification,
|
||||
status="in_progress",
|
||||
metadata_updates={
|
||||
"sync_stage": "processing",
|
||||
"total_count": total_count,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def _finish_obsidian_sync_notification(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
notification,
|
||||
indexed: int,
|
||||
failed: int,
|
||||
):
|
||||
"""Mark the rolling Obsidian sync inbox item complete or failed."""
|
||||
handler = NotificationService.connector_indexing
|
||||
connector_name = notification.notification_metadata.get(
|
||||
"connector_name", "Obsidian"
|
||||
)
|
||||
if failed > 0 and indexed == 0:
|
||||
title = f"Failed: {connector_name}"
|
||||
message = (
|
||||
f"Sync failed: {failed} file(s) failed"
|
||||
if failed > 1
|
||||
else "Sync failed: 1 file failed"
|
||||
)
|
||||
status_value = "failed"
|
||||
stage = "failed"
|
||||
else:
|
||||
title = f"Ready: {connector_name}"
|
||||
if failed > 0:
|
||||
message = f"Partially synced: {indexed} file(s) synced, {failed} failed."
|
||||
elif indexed == 0:
|
||||
message = "Already up to date!"
|
||||
elif indexed == 1:
|
||||
message = "Now searchable! 1 file synced."
|
||||
else:
|
||||
message = f"Now searchable! {indexed} files synced."
|
||||
status_value = "completed"
|
||||
stage = "completed"
|
||||
|
||||
await handler.update_notification(
|
||||
session=session,
|
||||
notification=notification,
|
||||
title=title,
|
||||
message=message,
|
||||
status=status_value,
|
||||
metadata_updates={
|
||||
"indexed_count": indexed,
|
||||
"failed_count": failed,
|
||||
"sync_stage": stage,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def _resolve_vault_connector(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
user: User,
|
||||
vault_id: str,
|
||||
) -> SearchSourceConnector:
|
||||
"""Find the OBSIDIAN_CONNECTOR row that owns ``vault_id`` for this user."""
|
||||
# ``config`` is core ``JSON`` (not ``JSONB``); ``as_string()`` is the
|
||||
# cross-dialect equivalent of ``.astext`` and compiles to ``->>``.
|
||||
stmt = select(SearchSourceConnector).where(
|
||||
and_(
|
||||
SearchSourceConnector.user_id == user.id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
|
||||
SearchSourceConnector.config["vault_id"].as_string() == vault_id,
|
||||
SearchSourceConnector.config["source"].as_string() == "plugin",
|
||||
)
|
||||
)
|
||||
|
||||
connector = (await session.execute(stmt)).scalars().first()
|
||||
if connector is not None:
|
||||
return connector
|
||||
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail={
|
||||
"code": "VAULT_NOT_REGISTERED",
|
||||
"message": (
|
||||
"No Obsidian plugin connector found for this vault. "
|
||||
"Call POST /obsidian/connect first."
|
||||
),
|
||||
"vault_id": vault_id,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _queue_obsidian_attachment(
|
||||
*, connector_id: int, note_payload: dict, user_id: str
|
||||
) -> None:
|
||||
"""Enqueue one non-markdown Obsidian note for background ETL/indexing."""
|
||||
index_obsidian_attachment_task.delay(
|
||||
connector_id=connector_id,
|
||||
payload_data=note_payload,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
|
||||
async def _ensure_search_space_access(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
user: User,
|
||||
search_space_id: int,
|
||||
) -> SearchSpace:
|
||||
"""Owner-only access to the search space (shared spaces are a follow-up)."""
|
||||
result = await session.execute(
|
||||
select(SearchSpace).where(
|
||||
and_(SearchSpace.id == search_space_id, SearchSpace.user_id == user.id)
|
||||
)
|
||||
)
|
||||
space = result.scalars().first()
|
||||
if space is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail={
|
||||
"code": "SEARCH_SPACE_FORBIDDEN",
|
||||
"message": "You don't own that search space.",
|
||||
},
|
||||
)
|
||||
return space
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/health", response_model=HealthResponse)
|
||||
async def obsidian_health(
|
||||
user: User = Depends(current_active_user),
|
||||
) -> HealthResponse:
|
||||
"""Return the API contract handshake; plugin caches it per onload."""
|
||||
return HealthResponse(
|
||||
**_build_handshake(),
|
||||
server_time_utc=datetime.now(UTC),
|
||||
)
|
||||
|
||||
|
||||
async def _find_by_vault_id(
|
||||
session: AsyncSession, *, user_id, vault_id: str
|
||||
) -> SearchSourceConnector | None:
|
||||
stmt = select(SearchSourceConnector).where(
|
||||
and_(
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
|
||||
SearchSourceConnector.config["source"].as_string() == "plugin",
|
||||
SearchSourceConnector.config["vault_id"].as_string() == vault_id,
|
||||
)
|
||||
)
|
||||
return (await session.execute(stmt)).scalars().first()
|
||||
|
||||
|
||||
async def _find_by_fingerprint(
|
||||
session: AsyncSession, *, user_id, vault_fingerprint: str
|
||||
) -> SearchSourceConnector | None:
|
||||
stmt = select(SearchSourceConnector).where(
|
||||
and_(
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
|
||||
SearchSourceConnector.config["source"].as_string() == "plugin",
|
||||
SearchSourceConnector.config["vault_fingerprint"].as_string()
|
||||
== vault_fingerprint,
|
||||
)
|
||||
)
|
||||
return (await session.execute(stmt)).scalars().first()
|
||||
|
||||
|
||||
def _build_config(payload: ConnectRequest, *, now_iso: str) -> dict[str, object]:
|
||||
return {
|
||||
"vault_id": payload.vault_id,
|
||||
"vault_name": payload.vault_name,
|
||||
"vault_fingerprint": payload.vault_fingerprint,
|
||||
"source": "plugin",
|
||||
"last_connect_at": now_iso,
|
||||
}
|
||||
|
||||
|
||||
def _display_name(vault_name: str) -> str:
|
||||
return f"Obsidian - {vault_name}"
|
||||
|
||||
|
||||
@router.post("/connect", response_model=ConnectResponse)
|
||||
async def obsidian_connect(
|
||||
payload: ConnectRequest,
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
) -> ConnectResponse:
|
||||
"""Register a vault, refresh an existing one, or adopt another device's row.
|
||||
|
||||
Resolution order:
|
||||
1. ``(user_id, vault_id)`` → known device, refresh metadata.
|
||||
2. ``(user_id, vault_fingerprint)`` → another device of the same vault,
|
||||
caller adopts the surviving ``vault_id``.
|
||||
3. Insert a new row.
|
||||
|
||||
Fingerprint collisions on (1) trigger ``merge_obsidian_connectors`` so
|
||||
the partial unique index can never produce two live rows for one vault.
|
||||
"""
|
||||
await _ensure_search_space_access(
|
||||
session, user=user, search_space_id=payload.search_space_id
|
||||
)
|
||||
|
||||
now_iso = datetime.now(UTC).isoformat()
|
||||
cfg = _build_config(payload, now_iso=now_iso)
|
||||
display_name = _display_name(payload.vault_name)
|
||||
|
||||
existing_by_vid = await _find_by_vault_id(
|
||||
session, user_id=user.id, vault_id=payload.vault_id
|
||||
)
|
||||
if existing_by_vid is not None:
|
||||
collision = await _find_by_fingerprint(
|
||||
session, user_id=user.id, vault_fingerprint=payload.vault_fingerprint
|
||||
)
|
||||
if collision is not None and collision.id != existing_by_vid.id:
|
||||
await merge_obsidian_connectors(
|
||||
session, source=existing_by_vid, target=collision
|
||||
)
|
||||
collision_cfg = dict(collision.config or {})
|
||||
collision_cfg["vault_name"] = payload.vault_name
|
||||
collision_cfg["last_connect_at"] = now_iso
|
||||
collision.config = collision_cfg
|
||||
collision.name = _display_name(payload.vault_name)
|
||||
response = ConnectResponse(
|
||||
connector_id=collision.id,
|
||||
vault_id=collision_cfg["vault_id"],
|
||||
search_space_id=collision.search_space_id,
|
||||
server_time_utc=datetime.now(UTC),
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
existing_by_vid.name = display_name
|
||||
existing_by_vid.config = cfg
|
||||
existing_by_vid.search_space_id = payload.search_space_id
|
||||
existing_by_vid.is_indexable = False
|
||||
response = ConnectResponse(
|
||||
connector_id=existing_by_vid.id,
|
||||
vault_id=payload.vault_id,
|
||||
search_space_id=existing_by_vid.search_space_id,
|
||||
server_time_utc=datetime.now(UTC),
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
existing_by_fp = await _find_by_fingerprint(
|
||||
session, user_id=user.id, vault_fingerprint=payload.vault_fingerprint
|
||||
)
|
||||
if existing_by_fp is not None:
|
||||
survivor_cfg = dict(existing_by_fp.config or {})
|
||||
survivor_cfg["vault_name"] = payload.vault_name
|
||||
survivor_cfg["last_connect_at"] = now_iso
|
||||
existing_by_fp.config = survivor_cfg
|
||||
existing_by_fp.name = display_name
|
||||
response = ConnectResponse(
|
||||
connector_id=existing_by_fp.id,
|
||||
vault_id=survivor_cfg["vault_id"],
|
||||
search_space_id=existing_by_fp.search_space_id,
|
||||
server_time_utc=datetime.now(UTC),
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
# ON CONFLICT DO NOTHING matches any unique index (vault_id OR
|
||||
# fingerprint), so concurrent first-time connects from two devices
|
||||
# of the same vault never raise IntegrityError — the loser just
|
||||
# gets an empty RETURNING and falls through to re-fetch the winner.
|
||||
insert_stmt = (
|
||||
pg_insert(SearchSourceConnector)
|
||||
.values(
|
||||
name=display_name,
|
||||
connector_type=SearchSourceConnectorType.OBSIDIAN_CONNECTOR,
|
||||
is_indexable=False,
|
||||
config=cfg,
|
||||
user_id=user.id,
|
||||
search_space_id=payload.search_space_id,
|
||||
)
|
||||
.on_conflict_do_nothing()
|
||||
.returning(
|
||||
SearchSourceConnector.id,
|
||||
SearchSourceConnector.search_space_id,
|
||||
)
|
||||
)
|
||||
inserted = (await session.execute(insert_stmt)).first()
|
||||
if inserted is not None:
|
||||
response = ConnectResponse(
|
||||
connector_id=inserted.id,
|
||||
vault_id=payload.vault_id,
|
||||
search_space_id=inserted.search_space_id,
|
||||
server_time_utc=datetime.now(UTC),
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
winner = await _find_by_fingerprint(
|
||||
session, user_id=user.id, vault_fingerprint=payload.vault_fingerprint
|
||||
)
|
||||
if winner is None:
|
||||
winner = await _find_by_vault_id(
|
||||
session, user_id=user.id, vault_id=payload.vault_id
|
||||
)
|
||||
if winner is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail="vault registration conflicted but winning row could not be located",
|
||||
)
|
||||
response = ConnectResponse(
|
||||
connector_id=winner.id,
|
||||
vault_id=(winner.config or {})["vault_id"],
|
||||
search_space_id=winner.search_space_id,
|
||||
server_time_utc=datetime.now(UTC),
|
||||
**_build_handshake(),
|
||||
)
|
||||
await session.commit()
|
||||
return response
|
||||
|
||||
|
||||
@router.post("/sync", response_model=SyncAck)
|
||||
async def obsidian_sync(
|
||||
payload: SyncBatchRequest,
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
) -> SyncAck:
|
||||
"""Batch-upsert notes; returns per-note ack so the plugin can dequeue/retry."""
|
||||
connector = await _resolve_vault_connector(
|
||||
session, user=user, vault_id=payload.vault_id
|
||||
)
|
||||
notification = None
|
||||
try:
|
||||
notification = await _start_obsidian_sync_notification(
|
||||
session, user=user, connector=connector, total_count=len(payload.notes)
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"obsidian sync notification start failed connector=%s user=%s",
|
||||
connector.id,
|
||||
user.id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
items: list[SyncAckItem] = []
|
||||
indexed = 0
|
||||
failed = 0
|
||||
|
||||
for note in payload.notes:
|
||||
try:
|
||||
if note.is_binary:
|
||||
ext = note.extension.lstrip(".").lower()
|
||||
if ext not in ALLOWED_ATTACHMENT_EXTENSIONS:
|
||||
failed += 1
|
||||
items.append(
|
||||
SyncAckItem(
|
||||
path=note.path,
|
||||
status="error",
|
||||
error=f"unsupported attachment extension: .{ext}",
|
||||
)
|
||||
)
|
||||
continue
|
||||
expected_mime = ATTACHMENT_MIME_TYPES[ext]
|
||||
if note.mime_type != expected_mime:
|
||||
failed += 1
|
||||
items.append(
|
||||
SyncAckItem(
|
||||
path=note.path,
|
||||
status="error",
|
||||
error=(
|
||||
f"mime_type '{note.mime_type}' does not match "
|
||||
f"extension .{ext}"
|
||||
),
|
||||
)
|
||||
)
|
||||
continue
|
||||
_queue_obsidian_attachment(
|
||||
connector_id=connector.id,
|
||||
note_payload=note.model_dump(mode="json"),
|
||||
user_id=str(user.id),
|
||||
)
|
||||
indexed += 1
|
||||
items.append(SyncAckItem(path=note.path, status="queued"))
|
||||
continue
|
||||
|
||||
doc = await upsert_note(
|
||||
session, connector=connector, payload=note, user_id=str(user.id)
|
||||
)
|
||||
indexed += 1
|
||||
items.append(SyncAckItem(path=note.path, status="ok", document_id=doc.id))
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as exc:
|
||||
failed += 1
|
||||
logger.exception(
|
||||
"obsidian /sync failed for path=%s vault=%s",
|
||||
note.path,
|
||||
payload.vault_id,
|
||||
)
|
||||
items.append(
|
||||
SyncAckItem(path=note.path, status="error", error=str(exc)[:300])
|
||||
)
|
||||
|
||||
if notification is not None:
|
||||
try:
|
||||
await _finish_obsidian_sync_notification(
|
||||
session,
|
||||
notification=notification,
|
||||
indexed=indexed,
|
||||
failed=failed,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"obsidian sync notification finish failed connector=%s user=%s",
|
||||
connector.id,
|
||||
user.id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
return SyncAck(
|
||||
vault_id=payload.vault_id,
|
||||
indexed=indexed,
|
||||
failed=failed,
|
||||
items=items,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/rename", response_model=RenameAck)
|
||||
async def obsidian_rename(
|
||||
payload: RenameBatchRequest,
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
) -> RenameAck:
|
||||
"""Apply a batch of vault rename events."""
|
||||
connector = await _resolve_vault_connector(
|
||||
session, user=user, vault_id=payload.vault_id
|
||||
)
|
||||
|
||||
items: list[RenameAckItem] = []
|
||||
renamed = 0
|
||||
missing = 0
|
||||
|
||||
for item in payload.renames:
|
||||
try:
|
||||
doc = await rename_note(
|
||||
session,
|
||||
connector=connector,
|
||||
old_path=item.old_path,
|
||||
new_path=item.new_path,
|
||||
vault_id=payload.vault_id,
|
||||
)
|
||||
if doc is None:
|
||||
missing += 1
|
||||
items.append(
|
||||
RenameAckItem(
|
||||
old_path=item.old_path,
|
||||
new_path=item.new_path,
|
||||
status="missing",
|
||||
)
|
||||
)
|
||||
else:
|
||||
renamed += 1
|
||||
items.append(
|
||||
RenameAckItem(
|
||||
old_path=item.old_path,
|
||||
new_path=item.new_path,
|
||||
status="ok",
|
||||
document_id=doc.id,
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"obsidian /rename failed for old=%s new=%s vault=%s",
|
||||
item.old_path,
|
||||
item.new_path,
|
||||
payload.vault_id,
|
||||
)
|
||||
items.append(
|
||||
RenameAckItem(
|
||||
old_path=item.old_path,
|
||||
new_path=item.new_path,
|
||||
status="error",
|
||||
error=str(exc)[:300],
|
||||
)
|
||||
)
|
||||
|
||||
return RenameAck(
|
||||
vault_id=payload.vault_id,
|
||||
renamed=renamed,
|
||||
missing=missing,
|
||||
items=items,
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/notes", response_model=DeleteAck)
|
||||
async def obsidian_delete_notes(
|
||||
payload: DeleteBatchRequest,
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
) -> DeleteAck:
|
||||
"""Soft-delete a batch of notes by vault-relative path."""
|
||||
connector = await _resolve_vault_connector(
|
||||
session, user=user, vault_id=payload.vault_id
|
||||
)
|
||||
|
||||
deleted = 0
|
||||
missing = 0
|
||||
items: list[DeleteAckItem] = []
|
||||
for path in payload.paths:
|
||||
try:
|
||||
ok = await delete_note(
|
||||
session,
|
||||
connector=connector,
|
||||
vault_id=payload.vault_id,
|
||||
path=path,
|
||||
)
|
||||
if ok:
|
||||
deleted += 1
|
||||
items.append(DeleteAckItem(path=path, status="ok"))
|
||||
else:
|
||||
missing += 1
|
||||
items.append(DeleteAckItem(path=path, status="missing"))
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"obsidian DELETE /notes failed for path=%s vault=%s",
|
||||
path,
|
||||
payload.vault_id,
|
||||
)
|
||||
items.append(DeleteAckItem(path=path, status="error", error=str(exc)[:300]))
|
||||
|
||||
return DeleteAck(
|
||||
vault_id=payload.vault_id,
|
||||
deleted=deleted,
|
||||
missing=missing,
|
||||
items=items,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/manifest", response_model=ManifestResponse)
|
||||
async def obsidian_manifest(
|
||||
vault_id: str = Query(..., description="Plugin-side stable vault UUID"),
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
) -> ManifestResponse:
|
||||
"""Return ``{path: {hash, mtime}}`` for the plugin's onload reconcile diff."""
|
||||
connector = await _resolve_vault_connector(session, user=user, vault_id=vault_id)
|
||||
return await get_manifest(session, connector=connector, vault_id=vault_id)
|
||||
|
||||
|
||||
@router.get("/stats", response_model=StatsResponse)
|
||||
async def obsidian_stats(
|
||||
vault_id: str = Query(..., description="Plugin-side stable vault UUID"),
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
) -> StatsResponse:
|
||||
"""Active-note count + last sync time for the web tile.
|
||||
|
||||
``files_synced`` excludes tombstones so it matches ``/manifest``;
|
||||
``last_sync_at`` includes them so deletes advance the freshness signal.
|
||||
"""
|
||||
connector = await _resolve_vault_connector(session, user=user, vault_id=vault_id)
|
||||
|
||||
is_active = Document.document_metadata["deleted_at"].as_string().is_(None)
|
||||
|
||||
row = (
|
||||
await session.execute(
|
||||
select(
|
||||
func.count(case((is_active, 1))).label("files_synced"),
|
||||
func.max(Document.updated_at).label("last_sync_at"),
|
||||
).where(
|
||||
and_(
|
||||
Document.connector_id == connector.id,
|
||||
Document.document_type == DocumentType.OBSIDIAN_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
).first()
|
||||
|
||||
return StatsResponse(
|
||||
vault_id=vault_id,
|
||||
files_synced=int(row[0] or 0),
|
||||
last_sync_at=row[1],
|
||||
)
|
||||
|
|
@ -1058,25 +1058,6 @@ async def index_connector_content(
|
|||
)
|
||||
response_message = "Web page indexing started in the background."
|
||||
|
||||
elif connector.connector_type == SearchSourceConnectorType.OBSIDIAN_CONNECTOR:
|
||||
from app.config import config as app_config
|
||||
from app.tasks.celery_tasks.connector_tasks import index_obsidian_vault_task
|
||||
|
||||
# Obsidian connector only available in self-hosted mode
|
||||
if not app_config.is_self_hosted():
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Obsidian connector is only available in self-hosted mode",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Triggering Obsidian vault indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
|
||||
)
|
||||
index_obsidian_vault_task.delay(
|
||||
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
|
||||
)
|
||||
response_message = "Obsidian vault indexing started in the background."
|
||||
|
||||
elif (
|
||||
connector.connector_type
|
||||
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
|
||||
|
|
@ -2549,59 +2530,6 @@ async def run_bookstack_indexing(
|
|||
)
|
||||
|
||||
|
||||
# Add new helper functions for Obsidian indexing
|
||||
async def run_obsidian_indexing_with_new_session(
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""Wrapper to run Obsidian indexing with its own database session."""
|
||||
logger.info(
|
||||
f"Background task started: Indexing Obsidian connector {connector_id} into space {search_space_id} from {start_date} to {end_date}"
|
||||
)
|
||||
async with async_session_maker() as session:
|
||||
await run_obsidian_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
logger.info(f"Background task finished: Indexing Obsidian connector {connector_id}")
|
||||
|
||||
|
||||
async def run_obsidian_indexing(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""
|
||||
Background task to run Obsidian vault indexing.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the Obsidian connector
|
||||
search_space_id: ID of the search space
|
||||
user_id: ID of the user
|
||||
start_date: Start date for indexing
|
||||
end_date: End date for indexing
|
||||
"""
|
||||
from app.tasks.connector_indexers import index_obsidian_vault
|
||||
|
||||
await _run_indexing_with_notifications(
|
||||
session=session,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
indexing_function=index_obsidian_vault,
|
||||
update_timestamp_func=_update_connector_timestamp_by_id,
|
||||
supports_heartbeat_callback=True,
|
||||
)
|
||||
|
||||
|
||||
async def run_composio_indexing_with_new_session(
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
|
|
|
|||
|
|
@ -1,59 +0,0 @@
|
|||
"""
|
||||
Obsidian Connector Credentials Schema.
|
||||
|
||||
Obsidian is a local-first note-taking app that stores notes as markdown files.
|
||||
This connector supports indexing from local file system (self-hosted only).
|
||||
"""
|
||||
|
||||
from pydantic import BaseModel, field_validator
|
||||
|
||||
|
||||
class ObsidianAuthCredentialsBase(BaseModel):
|
||||
"""
|
||||
Credentials/configuration for the Obsidian connector.
|
||||
|
||||
Since Obsidian vaults are local directories, this schema primarily
|
||||
holds the vault path and configuration options rather than API tokens.
|
||||
"""
|
||||
|
||||
vault_path: str
|
||||
vault_name: str | None = None
|
||||
exclude_folders: list[str] | None = None
|
||||
include_attachments: bool = False
|
||||
|
||||
@field_validator("vault_path")
|
||||
@classmethod
|
||||
def validate_vault_path(cls, v: str) -> str:
|
||||
"""Ensure vault path is provided and stripped of whitespace."""
|
||||
if not v or not v.strip():
|
||||
raise ValueError("Vault path is required")
|
||||
return v.strip()
|
||||
|
||||
@field_validator("exclude_folders", mode="before")
|
||||
@classmethod
|
||||
def parse_exclude_folders(cls, v):
|
||||
"""Parse exclude_folders from string if needed."""
|
||||
if v is None:
|
||||
return [".trash", ".obsidian", "templates"]
|
||||
if isinstance(v, str):
|
||||
return [f.strip() for f in v.split(",") if f.strip()]
|
||||
return v
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert credentials to dictionary for storage."""
|
||||
return {
|
||||
"vault_path": self.vault_path,
|
||||
"vault_name": self.vault_name,
|
||||
"exclude_folders": self.exclude_folders,
|
||||
"include_attachments": self.include_attachments,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "ObsidianAuthCredentialsBase":
|
||||
"""Create credentials from dictionary."""
|
||||
return cls(
|
||||
vault_path=data.get("vault_path", ""),
|
||||
vault_name=data.get("vault_name"),
|
||||
exclude_folders=data.get("exclude_folders"),
|
||||
include_attachments=data.get("include_attachments", False),
|
||||
)
|
||||
234
surfsense_backend/app/schemas/obsidian_plugin.py
Normal file
234
surfsense_backend/app/schemas/obsidian_plugin.py
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
"""Wire schemas spoken between the SurfSense Obsidian plugin and the backend.
|
||||
|
||||
All schemas inherit ``extra='ignore'`` from :class:`_PluginBase` so additive
|
||||
field changes never break either side; hard breaks live behind a new URL
|
||||
prefix (``/api/v2/...``).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, model_validator
|
||||
|
||||
_PLUGIN_MODEL_CONFIG = ConfigDict(extra="ignore")
|
||||
|
||||
|
||||
# Source of truth for the attachment whitelist. Mirrors MIME_BY_EXTENSION in
|
||||
# surfsense_obsidian/src/sync-engine.ts — keep in sync.
|
||||
ATTACHMENT_MIME_TYPES: dict[str, str] = {
|
||||
"pdf": "application/pdf",
|
||||
"png": "image/png",
|
||||
"jpg": "image/jpeg",
|
||||
"jpeg": "image/jpeg",
|
||||
"gif": "image/gif",
|
||||
"webp": "image/webp",
|
||||
"svg": "image/svg+xml",
|
||||
"txt": "text/plain",
|
||||
}
|
||||
ALLOWED_ATTACHMENT_EXTENSIONS: frozenset[str] = frozenset(ATTACHMENT_MIME_TYPES)
|
||||
|
||||
|
||||
class _PluginBase(BaseModel):
|
||||
"""Base schema carrying the shared forward-compatibility config."""
|
||||
|
||||
model_config = _PLUGIN_MODEL_CONFIG
|
||||
|
||||
|
||||
class HeadingRef(_PluginBase):
|
||||
"""One markdown heading extracted from Obsidian metadata cache."""
|
||||
|
||||
heading: str
|
||||
level: int = Field(ge=1, le=6)
|
||||
|
||||
|
||||
class NotePayload(_PluginBase):
|
||||
"""One Obsidian note as pushed by the plugin (the source of truth)."""
|
||||
|
||||
vault_id: str = Field(
|
||||
..., description="Stable plugin-generated UUID for this vault"
|
||||
)
|
||||
path: str = Field(..., description="Vault-relative path, e.g. 'notes/foo.md'")
|
||||
name: str = Field(..., description="File stem (no extension)")
|
||||
extension: str = Field(
|
||||
default="md", description="File extension without leading dot"
|
||||
)
|
||||
content: str = Field(default="", description="Raw markdown body (post-frontmatter)")
|
||||
|
||||
frontmatter: dict[str, Any] = Field(default_factory=dict)
|
||||
tags: list[str] = Field(default_factory=list)
|
||||
headings: list[HeadingRef] = Field(default_factory=list)
|
||||
resolved_links: list[str] = Field(default_factory=list)
|
||||
unresolved_links: list[str] = Field(default_factory=list)
|
||||
embeds: list[str] = Field(default_factory=list)
|
||||
aliases: list[str] = Field(default_factory=list)
|
||||
|
||||
content_hash: str = Field(
|
||||
..., description="Plugin-computed SHA-256 of the raw content"
|
||||
)
|
||||
is_binary: bool = Field(
|
||||
default=False,
|
||||
description=(
|
||||
"True when payload represents a non-markdown attachment. "
|
||||
"If set, the plugin may include binary_base64 for ETL extraction."
|
||||
),
|
||||
)
|
||||
binary_base64: str | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Base64-encoded raw file bytes for binary attachments. "
|
||||
"Used by the backend ETL pipeline."
|
||||
),
|
||||
)
|
||||
mime_type: str | None = Field(
|
||||
default=None,
|
||||
description="Optional MIME type hint for binary attachments.",
|
||||
)
|
||||
size: int | None = Field(
|
||||
default=None,
|
||||
ge=0,
|
||||
description="Byte size of the local file (mtime+size short-circuit signal). Optional for forward compatibility.",
|
||||
)
|
||||
mtime: datetime
|
||||
ctime: datetime
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _enforce_binary_invariants(self) -> NotePayload:
|
||||
if self.is_binary:
|
||||
if not self.binary_base64:
|
||||
raise ValueError("binary_base64 is required when is_binary is True")
|
||||
if not self.mime_type:
|
||||
raise ValueError("mime_type is required when is_binary is True")
|
||||
elif self.binary_base64 is not None or self.mime_type is not None:
|
||||
raise ValueError(
|
||||
"binary_base64 and mime_type must be omitted when is_binary is False",
|
||||
)
|
||||
return self
|
||||
|
||||
|
||||
class SyncBatchRequest(_PluginBase):
|
||||
"""Batch upsert; plugin sends 10-20 notes per request."""
|
||||
|
||||
vault_id: str
|
||||
notes: list[NotePayload] = Field(default_factory=list, max_length=100)
|
||||
|
||||
|
||||
class RenameItem(_PluginBase):
|
||||
old_path: str
|
||||
new_path: str
|
||||
|
||||
|
||||
class RenameBatchRequest(_PluginBase):
|
||||
vault_id: str
|
||||
renames: list[RenameItem] = Field(default_factory=list, max_length=200)
|
||||
|
||||
|
||||
class DeleteBatchRequest(_PluginBase):
|
||||
vault_id: str
|
||||
paths: list[str] = Field(default_factory=list, max_length=500)
|
||||
|
||||
|
||||
class ManifestEntry(_PluginBase):
|
||||
hash: str
|
||||
mtime: datetime
|
||||
size: int | None = Field(
|
||||
default=None,
|
||||
description="Byte size last seen by the server. Enables mtime+size short-circuit; absent when not yet recorded.",
|
||||
)
|
||||
|
||||
|
||||
class ManifestResponse(_PluginBase):
|
||||
"""Path-keyed manifest of every non-deleted note for a vault."""
|
||||
|
||||
vault_id: str
|
||||
items: dict[str, ManifestEntry] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class ConnectRequest(_PluginBase):
|
||||
"""Vault registration / heartbeat. Replayed on every plugin onload."""
|
||||
|
||||
vault_id: str
|
||||
vault_name: str
|
||||
search_space_id: int
|
||||
vault_fingerprint: str = Field(
|
||||
...,
|
||||
description=(
|
||||
"Deterministic SHA-256 over the sorted markdown paths in the vault "
|
||||
"(plus vault_name). Same vault content on any device produces the "
|
||||
"same value; the server uses it to dedup connectors across devices."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class ConnectResponse(_PluginBase):
|
||||
"""Carries the same handshake fields as ``HealthResponse`` so the plugin
|
||||
learns the contract without a separate ``GET /health`` round-trip."""
|
||||
|
||||
connector_id: int
|
||||
vault_id: str
|
||||
search_space_id: int
|
||||
capabilities: list[str]
|
||||
server_time_utc: datetime
|
||||
|
||||
|
||||
class HealthResponse(_PluginBase):
|
||||
"""API contract handshake. ``capabilities`` is additive-only string list."""
|
||||
|
||||
capabilities: list[str]
|
||||
server_time_utc: datetime
|
||||
|
||||
|
||||
# Per-item batch ack schemas — wire shape is load-bearing for the plugin
|
||||
# queue (see api-client.ts / sync-engine.ts:processBatch).
|
||||
|
||||
|
||||
class SyncAckItem(_PluginBase):
|
||||
path: str
|
||||
status: Literal["ok", "queued", "error"]
|
||||
document_id: int | None = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class SyncAck(_PluginBase):
|
||||
vault_id: str
|
||||
indexed: int
|
||||
failed: int
|
||||
items: list[SyncAckItem] = Field(default_factory=list)
|
||||
|
||||
|
||||
class RenameAckItem(_PluginBase):
|
||||
old_path: str
|
||||
new_path: str
|
||||
# ``missing`` is treated as success client-side (end state reached).
|
||||
status: Literal["ok", "error", "missing"]
|
||||
document_id: int | None = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class RenameAck(_PluginBase):
|
||||
vault_id: str
|
||||
renamed: int
|
||||
missing: int
|
||||
items: list[RenameAckItem] = Field(default_factory=list)
|
||||
|
||||
|
||||
class DeleteAckItem(_PluginBase):
|
||||
path: str
|
||||
status: Literal["ok", "error", "missing"]
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class DeleteAck(_PluginBase):
|
||||
vault_id: str
|
||||
deleted: int
|
||||
missing: int
|
||||
items: list[DeleteAckItem] = Field(default_factory=list)
|
||||
|
||||
|
||||
class StatsResponse(_PluginBase):
|
||||
"""Backs the Obsidian connector tile in the web UI."""
|
||||
|
||||
vault_id: str
|
||||
files_synced: int
|
||||
last_sync_at: datetime | None = None
|
||||
616
surfsense_backend/app/services/obsidian_plugin_indexer.py
Normal file
616
surfsense_backend/app/services/obsidian_plugin_indexer.py
Normal file
|
|
@ -0,0 +1,616 @@
|
|||
"""
|
||||
Obsidian plugin indexer service.
|
||||
|
||||
Bridges the SurfSense Obsidian plugin's HTTP payloads
|
||||
(see ``app/schemas/obsidian_plugin.py``) into the shared
|
||||
``IndexingPipelineService``.
|
||||
|
||||
Responsibilities:
|
||||
|
||||
- ``upsert_note`` — push one note through the indexing pipeline; respects
|
||||
unchanged content (skip) and version-snapshots existing rows before
|
||||
rewrite.
|
||||
- ``rename_note`` — rewrite path-derived fields (path metadata,
|
||||
``unique_identifier_hash``, ``source_url``) without re-indexing content.
|
||||
- ``delete_note`` — soft delete with a tombstone in ``document_metadata``
|
||||
so reconciliation can distinguish "user explicitly killed this in the UI"
|
||||
from "plugin hasn't synced yet".
|
||||
- ``get_manifest`` — return ``{path: {hash, mtime, size}}`` for every
|
||||
non-deleted note belonging to a vault, used by the plugin's reconcile
|
||||
pass on ``onload``.
|
||||
|
||||
Design notes
|
||||
------------
|
||||
|
||||
The plugin's content hash and the backend's ``content_hash`` are computed
|
||||
differently (plugin uses raw SHA-256 of the markdown body; backend salts
|
||||
with ``search_space_id``). We persist the plugin's hash in
|
||||
``document_metadata['plugin_content_hash']`` so the manifest endpoint can
|
||||
return what the plugin sent — that's the only number the plugin can
|
||||
compare without re-downloading content.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
from urllib.parse import quote
|
||||
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentStatus,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
)
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
from app.schemas.obsidian_plugin import (
|
||||
ManifestEntry,
|
||||
ManifestResponse,
|
||||
NotePayload,
|
||||
)
|
||||
from app.utils.document_converters import generate_unique_identifier_hash
|
||||
from app.utils.document_versioning import create_version_snapshot
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _vault_path_unique_id(vault_id: str, path: str) -> str:
|
||||
"""Stable identifier for a note. Vault-scoped so the same path under two
|
||||
different vaults doesn't collide."""
|
||||
return f"{vault_id}:{path}"
|
||||
|
||||
|
||||
def _build_source_url(vault_name: str, path: str) -> str:
|
||||
"""Build the ``obsidian://`` deep link for the web UI's "Open in Obsidian"
|
||||
button. Both segments are URL-encoded because vault names and paths can
|
||||
contain spaces, ``#``, ``?``, etc.
|
||||
"""
|
||||
return (
|
||||
"obsidian://open"
|
||||
f"?vault={quote(vault_name, safe='')}"
|
||||
f"&file={quote(path, safe='')}"
|
||||
)
|
||||
|
||||
|
||||
def _build_metadata(
|
||||
payload: NotePayload,
|
||||
*,
|
||||
vault_name: str,
|
||||
connector_id: int,
|
||||
extra: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Flatten the rich plugin payload into the JSONB ``document_metadata``
|
||||
column. Keys here are what the chat UI / search UI surface to users.
|
||||
"""
|
||||
meta: dict[str, Any] = {
|
||||
"source": "plugin",
|
||||
"vault_id": payload.vault_id,
|
||||
"vault_name": vault_name,
|
||||
"file_path": payload.path,
|
||||
"file_name": payload.name,
|
||||
"extension": payload.extension,
|
||||
"frontmatter": payload.frontmatter,
|
||||
"tags": payload.tags,
|
||||
"headings": [h.model_dump() for h in payload.headings],
|
||||
"outgoing_links": payload.resolved_links,
|
||||
"unresolved_links": payload.unresolved_links,
|
||||
"embeds": payload.embeds,
|
||||
"aliases": payload.aliases,
|
||||
"plugin_content_hash": payload.content_hash,
|
||||
"plugin_file_size": payload.size,
|
||||
"mtime": payload.mtime.isoformat(),
|
||||
"ctime": payload.ctime.isoformat(),
|
||||
"connector_id": connector_id,
|
||||
"url": _build_source_url(vault_name, payload.path),
|
||||
}
|
||||
if payload.is_binary:
|
||||
meta["is_binary"] = True
|
||||
meta["mime_type"] = payload.mime_type
|
||||
if extra:
|
||||
meta.update(extra)
|
||||
return meta
|
||||
|
||||
|
||||
def _build_document_string(
|
||||
payload: NotePayload, vault_name: str, *, content_override: str | None = None
|
||||
) -> str:
|
||||
"""Compose the indexable string the pipeline embeds and chunks.
|
||||
|
||||
Mirrors the legacy obsidian indexer's METADATA + CONTENT framing so
|
||||
existing search relevance heuristics keep working unchanged.
|
||||
"""
|
||||
tags_line = ", ".join(payload.tags) if payload.tags else "None"
|
||||
links_line = ", ".join(payload.resolved_links) if payload.resolved_links else "None"
|
||||
body = payload.content if content_override is None else content_override
|
||||
return (
|
||||
"<METADATA>\n"
|
||||
f"Title: {payload.name}\n"
|
||||
f"Vault: {vault_name}\n"
|
||||
f"Path: {payload.path}\n"
|
||||
f"Tags: {tags_line}\n"
|
||||
f"Links to: {links_line}\n"
|
||||
"</METADATA>\n\n"
|
||||
"<CONTENT>\n"
|
||||
f"{body}\n"
|
||||
"</CONTENT>\n"
|
||||
)
|
||||
|
||||
|
||||
async def _extract_binary_attachment_markdown(
|
||||
payload: NotePayload, *, vision_llm
|
||||
) -> tuple[str, dict[str, Any]]:
|
||||
try:
|
||||
raw_bytes = base64.b64decode(payload.binary_base64, validate=True)
|
||||
except Exception:
|
||||
logger.warning("obsidian attachment payload had invalid base64: %s", payload.path)
|
||||
return "", {"attachment_extraction_status": "invalid_binary_payload"}
|
||||
|
||||
suffix = f".{payload.extension.lstrip('.')}"
|
||||
temp_path: str | None = None
|
||||
filename = payload.path.rsplit("/", 1)[-1] or payload.name
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
|
||||
tmp.write(raw_bytes)
|
||||
temp_path = tmp.name
|
||||
|
||||
result = await _run_etl_extract(
|
||||
file_path=temp_path,
|
||||
filename=filename,
|
||||
vision_llm=vision_llm,
|
||||
)
|
||||
metadata: dict[str, Any] = {
|
||||
"attachment_extraction_status": "ok",
|
||||
"attachment_etl_service": result.etl_service,
|
||||
"attachment_content_type": result.content_type,
|
||||
}
|
||||
return result.markdown_content, metadata
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"obsidian attachment ETL failed for %s: %s", payload.path, exc, exc_info=True
|
||||
)
|
||||
return "", {
|
||||
"attachment_extraction_status": "etl_failed",
|
||||
"attachment_extraction_error": str(exc)[:300],
|
||||
}
|
||||
finally:
|
||||
if temp_path and os.path.exists(temp_path):
|
||||
with contextlib.suppress(Exception):
|
||||
os.unlink(temp_path)
|
||||
|
||||
|
||||
async def _run_etl_extract(*, file_path: str, filename: str, vision_llm):
|
||||
"""Lazy-load ETL dependencies to avoid module-import cycles."""
|
||||
from app.etl_pipeline.etl_document import EtlRequest
|
||||
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
|
||||
|
||||
return await EtlPipelineService(vision_llm=vision_llm).extract(
|
||||
EtlRequest(file_path=file_path, filename=filename)
|
||||
)
|
||||
|
||||
|
||||
def _is_image_attachment(payload: NotePayload) -> bool:
|
||||
ext = payload.extension.lower().lstrip(".")
|
||||
return ext in {"png", "jpg", "jpeg", "gif", "webp", "svg"}
|
||||
|
||||
|
||||
async def _resolve_attachment_vision_llm(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
connector: SearchSourceConnector,
|
||||
search_space_id: int,
|
||||
payload: NotePayload,
|
||||
):
|
||||
"""Match connector indexers: only fetch vision LLM for image attachments
|
||||
when the connector has vision indexing enabled."""
|
||||
if not payload.is_binary:
|
||||
return None
|
||||
if not _is_image_attachment(payload):
|
||||
return None
|
||||
if not getattr(connector, "enable_vision_llm", False):
|
||||
return None
|
||||
|
||||
from app.services.llm_service import get_vision_llm
|
||||
|
||||
return await get_vision_llm(session, search_space_id)
|
||||
|
||||
|
||||
async def _resolve_summary_llm(
|
||||
session: AsyncSession, *, user_id: str, search_space_id: int, should_summarize: bool
|
||||
):
|
||||
"""Fetch summary LLM only when indexing summary is enabled."""
|
||||
if not should_summarize:
|
||||
return None
|
||||
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
|
||||
return await get_user_long_context_llm(session, user_id, search_space_id)
|
||||
|
||||
|
||||
def _require_extracted_attachment_content(
|
||||
*, content: str, etl_meta: dict[str, Any], path: str
|
||||
) -> str:
|
||||
extracted = content.strip()
|
||||
if extracted:
|
||||
return extracted
|
||||
|
||||
status = etl_meta.get("attachment_extraction_status", "unknown")
|
||||
reason = etl_meta.get("attachment_extraction_error")
|
||||
if reason:
|
||||
raise RuntimeError(
|
||||
f"Attachment extraction failed for {path} ({status}): {reason}"
|
||||
)
|
||||
raise RuntimeError(f"Attachment extraction failed for {path} ({status})")
|
||||
|
||||
|
||||
async def _find_existing_document(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
search_space_id: int,
|
||||
vault_id: str,
|
||||
path: str,
|
||||
) -> Document | None:
|
||||
unique_id = _vault_path_unique_id(vault_id, path)
|
||||
uid_hash = generate_unique_identifier_hash(
|
||||
DocumentType.OBSIDIAN_CONNECTOR,
|
||||
unique_id,
|
||||
search_space_id,
|
||||
)
|
||||
result = await session.execute(
|
||||
select(Document).where(Document.unique_identifier_hash == uid_hash)
|
||||
)
|
||||
return result.scalars().first()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def upsert_note(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
connector: SearchSourceConnector,
|
||||
payload: NotePayload,
|
||||
user_id: str,
|
||||
) -> Document:
|
||||
"""Index or refresh a single note pushed by the plugin.
|
||||
|
||||
Returns the resulting ``Document`` (whether newly created, updated, or
|
||||
a skip-because-unchanged hit).
|
||||
"""
|
||||
vault_name: str = (connector.config or {}).get("vault_name") or "Vault"
|
||||
search_space_id = connector.search_space_id
|
||||
|
||||
existing = await _find_existing_document(
|
||||
session,
|
||||
search_space_id=search_space_id,
|
||||
vault_id=payload.vault_id,
|
||||
path=payload.path,
|
||||
)
|
||||
|
||||
plugin_hash = payload.content_hash
|
||||
if existing is not None:
|
||||
existing_meta = existing.document_metadata or {}
|
||||
was_tombstoned = bool(existing_meta.get("deleted_at"))
|
||||
|
||||
if (
|
||||
not was_tombstoned
|
||||
and existing_meta.get("plugin_content_hash") == plugin_hash
|
||||
and DocumentStatus.is_state(existing.status, DocumentStatus.READY)
|
||||
):
|
||||
return existing
|
||||
|
||||
try:
|
||||
await create_version_snapshot(session, existing)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"version snapshot failed for obsidian doc %s",
|
||||
existing.id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
content_for_index = payload.content
|
||||
extra_meta: dict[str, Any] = {}
|
||||
vision_llm = None
|
||||
if payload.is_binary:
|
||||
vision_llm = await _resolve_attachment_vision_llm(
|
||||
session,
|
||||
connector=connector,
|
||||
search_space_id=search_space_id,
|
||||
payload=payload,
|
||||
)
|
||||
content_for_index, etl_meta = await _extract_binary_attachment_markdown(
|
||||
payload, vision_llm=vision_llm
|
||||
)
|
||||
extra_meta.update(etl_meta)
|
||||
# Strict KB behavior: do not index metadata-only attachments.
|
||||
content_for_index = _require_extracted_attachment_content(
|
||||
content=content_for_index,
|
||||
etl_meta=etl_meta,
|
||||
path=payload.path,
|
||||
)
|
||||
|
||||
llm = await _resolve_summary_llm(
|
||||
session,
|
||||
user_id=str(user_id),
|
||||
search_space_id=search_space_id,
|
||||
should_summarize=connector.enable_summary,
|
||||
)
|
||||
|
||||
document_string = _build_document_string(
|
||||
payload, vault_name, content_override=content_for_index
|
||||
)
|
||||
metadata = _build_metadata(
|
||||
payload,
|
||||
vault_name=vault_name,
|
||||
connector_id=connector.id,
|
||||
extra=extra_meta,
|
||||
)
|
||||
|
||||
connector_doc = ConnectorDocument(
|
||||
title=payload.name,
|
||||
source_markdown=document_string,
|
||||
unique_id=_vault_path_unique_id(payload.vault_id, payload.path),
|
||||
document_type=DocumentType.OBSIDIAN_CONNECTOR,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=connector.id,
|
||||
created_by_id=str(user_id),
|
||||
should_summarize=connector.enable_summary,
|
||||
fallback_summary=f"Obsidian Note: {payload.name}\n\n{content_for_index}",
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
pipeline = IndexingPipelineService(session)
|
||||
prepared = await pipeline.prepare_for_indexing([connector_doc])
|
||||
if not prepared:
|
||||
if existing is not None:
|
||||
return existing
|
||||
raise RuntimeError(f"Indexing pipeline rejected obsidian note {payload.path}")
|
||||
|
||||
document = prepared[0]
|
||||
|
||||
return await pipeline.index(document, connector_doc, llm)
|
||||
|
||||
|
||||
async def rename_note(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
connector: SearchSourceConnector,
|
||||
old_path: str,
|
||||
new_path: str,
|
||||
vault_id: str,
|
||||
) -> Document | None:
|
||||
"""Rewrite path-derived columns without re-indexing content.
|
||||
|
||||
Returns the updated document, or ``None`` if no row matched the
|
||||
``old_path`` (this happens when the plugin is renaming a file that was
|
||||
never synced — safe to ignore, the next ``sync`` will create it under
|
||||
the new path).
|
||||
"""
|
||||
vault_name: str = (connector.config or {}).get("vault_name") or "Vault"
|
||||
search_space_id = connector.search_space_id
|
||||
|
||||
existing = await _find_existing_document(
|
||||
session,
|
||||
search_space_id=search_space_id,
|
||||
vault_id=vault_id,
|
||||
path=old_path,
|
||||
)
|
||||
if existing is None:
|
||||
return None
|
||||
|
||||
new_unique_id = _vault_path_unique_id(vault_id, new_path)
|
||||
new_uid_hash = generate_unique_identifier_hash(
|
||||
DocumentType.OBSIDIAN_CONNECTOR,
|
||||
new_unique_id,
|
||||
search_space_id,
|
||||
)
|
||||
|
||||
collision = await session.execute(
|
||||
select(Document).where(
|
||||
and_(
|
||||
Document.unique_identifier_hash == new_uid_hash,
|
||||
Document.id != existing.id,
|
||||
)
|
||||
)
|
||||
)
|
||||
collision_row = collision.scalars().first()
|
||||
if collision_row is not None:
|
||||
logger.warning(
|
||||
"obsidian rename target already exists "
|
||||
"(vault=%s old=%s new=%s); skipping rename so the next /sync "
|
||||
"can resolve the conflict via content_hash",
|
||||
vault_id,
|
||||
old_path,
|
||||
new_path,
|
||||
)
|
||||
return existing
|
||||
|
||||
new_filename = new_path.rsplit("/", 1)[-1]
|
||||
new_stem = new_filename.rsplit(".", 1)[0] if "." in new_filename else new_filename
|
||||
|
||||
existing.unique_identifier_hash = new_uid_hash
|
||||
existing.title = new_stem
|
||||
|
||||
meta = dict(existing.document_metadata or {})
|
||||
meta["file_path"] = new_path
|
||||
meta["file_name"] = new_stem
|
||||
meta["url"] = _build_source_url(vault_name, new_path)
|
||||
existing.document_metadata = meta
|
||||
existing.updated_at = datetime.now(UTC)
|
||||
|
||||
await session.commit()
|
||||
return existing
|
||||
|
||||
|
||||
async def delete_note(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
connector: SearchSourceConnector,
|
||||
vault_id: str,
|
||||
path: str,
|
||||
) -> bool:
|
||||
"""Soft-delete via tombstone in ``document_metadata``.
|
||||
|
||||
The row is *not* removed and chunks are *not* dropped, so existing
|
||||
citations in chat threads remain resolvable. The manifest endpoint
|
||||
filters tombstoned rows out, so the plugin's reconcile pass will not
|
||||
see this path and won't try to "resurrect" a note the user deleted in
|
||||
the SurfSense UI.
|
||||
|
||||
Returns True if a row was tombstoned, False if no matching row existed.
|
||||
"""
|
||||
existing = await _find_existing_document(
|
||||
session,
|
||||
search_space_id=connector.search_space_id,
|
||||
vault_id=vault_id,
|
||||
path=path,
|
||||
)
|
||||
if existing is None:
|
||||
return False
|
||||
|
||||
meta = dict(existing.document_metadata or {})
|
||||
if meta.get("deleted_at"):
|
||||
return True
|
||||
|
||||
meta["deleted_at"] = datetime.now(UTC).isoformat()
|
||||
meta["deleted_by_source"] = "plugin"
|
||||
existing.document_metadata = meta
|
||||
existing.updated_at = datetime.now(UTC)
|
||||
|
||||
await session.commit()
|
||||
return True
|
||||
|
||||
|
||||
async def merge_obsidian_connectors(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
source: SearchSourceConnector,
|
||||
target: SearchSourceConnector,
|
||||
) -> None:
|
||||
"""Fold ``source``'s documents into ``target`` and delete ``source``.
|
||||
|
||||
Triggered when the fingerprint dedup detects two plugin connectors
|
||||
pointing at the same vault (e.g. a mobile install raced with iCloud
|
||||
hydration and got a partial fingerprint, then caught up). Path
|
||||
collisions resolve in favour of ``target`` (the surviving row);
|
||||
``source``'s duplicate documents are hard-deleted along with their
|
||||
chunks via the ``cascade='all, delete-orphan'`` on ``Document.chunks``.
|
||||
"""
|
||||
if source.id == target.id:
|
||||
return
|
||||
|
||||
target_vault_id = (target.config or {}).get("vault_id")
|
||||
target_search_space_id = target.search_space_id
|
||||
if not target_vault_id:
|
||||
raise RuntimeError("merge target is missing vault_id")
|
||||
|
||||
target_paths_result = await session.execute(
|
||||
select(Document).where(
|
||||
and_(
|
||||
Document.connector_id == target.id,
|
||||
Document.document_type == DocumentType.OBSIDIAN_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
target_paths: set[str] = set()
|
||||
for doc in target_paths_result.scalars().all():
|
||||
meta = doc.document_metadata or {}
|
||||
path = meta.get("file_path")
|
||||
if path:
|
||||
target_paths.add(path)
|
||||
|
||||
source_docs_result = await session.execute(
|
||||
select(Document).where(
|
||||
and_(
|
||||
Document.connector_id == source.id,
|
||||
Document.document_type == DocumentType.OBSIDIAN_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
for doc in source_docs_result.scalars().all():
|
||||
meta = dict(doc.document_metadata or {})
|
||||
path = meta.get("file_path")
|
||||
if not path or path in target_paths:
|
||||
await session.delete(doc)
|
||||
continue
|
||||
|
||||
new_unique_id = _vault_path_unique_id(target_vault_id, path)
|
||||
new_uid_hash = generate_unique_identifier_hash(
|
||||
DocumentType.OBSIDIAN_CONNECTOR,
|
||||
new_unique_id,
|
||||
target_search_space_id,
|
||||
)
|
||||
meta["vault_id"] = target_vault_id
|
||||
meta["connector_id"] = target.id
|
||||
doc.document_metadata = meta
|
||||
doc.connector_id = target.id
|
||||
doc.search_space_id = target_search_space_id
|
||||
doc.unique_identifier_hash = new_uid_hash
|
||||
target_paths.add(path)
|
||||
|
||||
await session.flush()
|
||||
await session.delete(source)
|
||||
|
||||
|
||||
async def get_manifest(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
connector: SearchSourceConnector,
|
||||
vault_id: str,
|
||||
) -> ManifestResponse:
|
||||
"""Return ``{path: {hash, mtime, size}}`` for every non-deleted note in
|
||||
this vault.
|
||||
|
||||
The plugin compares this against its local vault on every ``onload`` to
|
||||
catch up edits made while offline. Rows missing ``plugin_content_hash``
|
||||
(e.g. tombstoned, or somehow indexed without going through this
|
||||
service) are excluded so the plugin doesn't get confused by partial
|
||||
data.
|
||||
"""
|
||||
result = await session.execute(
|
||||
select(Document).where(
|
||||
and_(
|
||||
Document.search_space_id == connector.search_space_id,
|
||||
Document.connector_id == connector.id,
|
||||
Document.document_type == DocumentType.OBSIDIAN_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
items: dict[str, ManifestEntry] = {}
|
||||
for doc in result.scalars().all():
|
||||
meta = doc.document_metadata or {}
|
||||
if meta.get("deleted_at"):
|
||||
continue
|
||||
if meta.get("vault_id") != vault_id:
|
||||
continue
|
||||
path = meta.get("file_path")
|
||||
plugin_hash = meta.get("plugin_content_hash")
|
||||
mtime_raw = meta.get("mtime")
|
||||
if not path or not plugin_hash or not mtime_raw:
|
||||
continue
|
||||
try:
|
||||
mtime = datetime.fromisoformat(mtime_raw)
|
||||
except ValueError:
|
||||
continue
|
||||
size_raw = meta.get("plugin_file_size")
|
||||
size = int(size_raw) if isinstance(size_raw, int) else None
|
||||
items[path] = ManifestEntry(hash=plugin_hash, mtime=mtime, size=size)
|
||||
|
||||
return ManifestResponse(vault_id=vault_id, items=items)
|
||||
|
|
@ -536,49 +536,6 @@ async def _index_bookstack_pages(
|
|||
)
|
||||
|
||||
|
||||
@celery_app.task(name="index_obsidian_vault", bind=True)
|
||||
def index_obsidian_vault_task(
|
||||
self,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""Celery task to index Obsidian vault notes."""
|
||||
import asyncio
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
_index_obsidian_vault(
|
||||
connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _index_obsidian_vault(
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""Index Obsidian vault with new session."""
|
||||
from app.routes.search_source_connectors_routes import (
|
||||
run_obsidian_indexing,
|
||||
)
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
await run_obsidian_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
|
||||
|
||||
@celery_app.task(name="index_composio_connector", bind=True)
|
||||
def index_composio_connector_task(
|
||||
self,
|
||||
|
|
|
|||
59
surfsense_backend/app/tasks/celery_tasks/obsidian_tasks.py
Normal file
59
surfsense_backend/app/tasks/celery_tasks/obsidian_tasks.py
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
"""Celery tasks for Obsidian plugin background processing."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from app.celery_app import celery_app
|
||||
from app.db import SearchSourceConnector
|
||||
from app.schemas.obsidian_plugin import NotePayload
|
||||
from app.services.obsidian_plugin_indexer import upsert_note
|
||||
from app.tasks.celery_tasks import get_celery_session_maker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@celery_app.task(name="index_obsidian_attachment", bind=True)
|
||||
def index_obsidian_attachment_task(
|
||||
self,
|
||||
connector_id: int,
|
||||
payload_data: dict,
|
||||
user_id: str,
|
||||
) -> None:
|
||||
"""Process one Obsidian non-markdown attachment asynchronously."""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
_index_obsidian_attachment(
|
||||
connector_id=connector_id,
|
||||
payload_data=payload_data,
|
||||
user_id=user_id,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _index_obsidian_attachment(
|
||||
*,
|
||||
connector_id: int,
|
||||
payload_data: dict,
|
||||
user_id: str,
|
||||
) -> None:
|
||||
async with get_celery_session_maker()() as session:
|
||||
connector = await session.get(SearchSourceConnector, connector_id)
|
||||
if connector is None:
|
||||
logger.warning(
|
||||
"obsidian attachment task skipped: connector %s not found", connector_id
|
||||
)
|
||||
return
|
||||
|
||||
payload = NotePayload.model_validate(payload_data)
|
||||
await upsert_note(
|
||||
session,
|
||||
connector=connector,
|
||||
payload=payload,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
|
@ -14,18 +14,16 @@ from .google_calendar_indexer import index_google_calendar_events
|
|||
from .google_drive_indexer import index_google_drive_files
|
||||
from .google_gmail_indexer import index_google_gmail_messages
|
||||
from .notion_indexer import index_notion_pages
|
||||
from .obsidian_indexer import index_obsidian_vault
|
||||
from .webcrawler_indexer import index_crawled_urls
|
||||
|
||||
__all__ = [
|
||||
"index_bookstack_pages",
|
||||
"index_confluence_pages",
|
||||
"index_crawled_urls",
|
||||
"index_elasticsearch_documents",
|
||||
"index_github_repos",
|
||||
"index_google_calendar_events",
|
||||
"index_google_drive_files",
|
||||
"index_google_gmail_messages",
|
||||
"index_notion_pages",
|
||||
"index_obsidian_vault",
|
||||
"index_crawled_urls",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,676 +0,0 @@
|
|||
"""
|
||||
Obsidian connector indexer.
|
||||
|
||||
Indexes markdown notes from a local Obsidian vault.
|
||||
This connector is only available in self-hosted mode.
|
||||
|
||||
Implements 2-phase document status updates for real-time UI feedback:
|
||||
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
||||
- Phase 2: Process each document: pending → processing → ready/failed
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from collections.abc import Awaitable, Callable
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import config
|
||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
embed_text,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
generate_unique_identifier_hash,
|
||||
)
|
||||
|
||||
from .base import (
|
||||
build_document_metadata_string,
|
||||
check_document_by_unique_identifier,
|
||||
check_duplicate_document_by_hash,
|
||||
get_connector_by_id,
|
||||
get_current_timestamp,
|
||||
logger,
|
||||
safe_set_chunks,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
# Type hint for heartbeat callback
|
||||
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
|
||||
|
||||
# Heartbeat interval in seconds
|
||||
HEARTBEAT_INTERVAL_SECONDS = 30
|
||||
|
||||
|
||||
def parse_frontmatter(content: str) -> tuple[dict | None, str]:
|
||||
"""
|
||||
Parse YAML frontmatter from markdown content.
|
||||
|
||||
Args:
|
||||
content: The full markdown content
|
||||
|
||||
Returns:
|
||||
Tuple of (frontmatter dict or None, content without frontmatter)
|
||||
"""
|
||||
if not content.startswith("---"):
|
||||
return None, content
|
||||
|
||||
# Find the closing ---
|
||||
end_match = re.search(r"\n---\n", content[3:])
|
||||
if not end_match:
|
||||
return None, content
|
||||
|
||||
frontmatter_str = content[3 : end_match.start() + 3]
|
||||
remaining_content = content[end_match.end() + 3 :]
|
||||
|
||||
try:
|
||||
frontmatter = yaml.safe_load(frontmatter_str)
|
||||
return frontmatter, remaining_content.strip()
|
||||
except yaml.YAMLError:
|
||||
return None, content
|
||||
|
||||
|
||||
def extract_wiki_links(content: str) -> list[str]:
|
||||
"""
|
||||
Extract [[wiki-style links]] from content.
|
||||
|
||||
Args:
|
||||
content: Markdown content
|
||||
|
||||
Returns:
|
||||
List of linked note names
|
||||
"""
|
||||
# Match [[link]] or [[link|alias]]
|
||||
pattern = r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]"
|
||||
matches = re.findall(pattern, content)
|
||||
return list(set(matches))
|
||||
|
||||
|
||||
def extract_tags(content: str) -> list[str]:
|
||||
"""
|
||||
Extract #tags from content (both inline and frontmatter).
|
||||
|
||||
Args:
|
||||
content: Markdown content
|
||||
|
||||
Returns:
|
||||
List of tags (without # prefix)
|
||||
"""
|
||||
# Match #tag but not ## headers
|
||||
pattern = r"(?<!\S)#([a-zA-Z][a-zA-Z0-9_/-]*)"
|
||||
matches = re.findall(pattern, content)
|
||||
return list(set(matches))
|
||||
|
||||
|
||||
def scan_vault(
|
||||
vault_path: str,
|
||||
exclude_folders: list[str] | None = None,
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Scan an Obsidian vault for markdown files.
|
||||
|
||||
Args:
|
||||
vault_path: Path to the Obsidian vault
|
||||
exclude_folders: List of folder names to exclude
|
||||
|
||||
Returns:
|
||||
List of file info dicts with path, name, modified time
|
||||
"""
|
||||
if exclude_folders is None:
|
||||
exclude_folders = [".trash", ".obsidian", "templates"]
|
||||
|
||||
vault = Path(vault_path)
|
||||
if not vault.exists():
|
||||
raise ValueError(f"Vault path does not exist: {vault_path}")
|
||||
|
||||
files = []
|
||||
for md_file in vault.rglob("*.md"):
|
||||
# Check if file is in an excluded folder
|
||||
relative_path = md_file.relative_to(vault)
|
||||
parts = relative_path.parts
|
||||
|
||||
if any(excluded in parts for excluded in exclude_folders):
|
||||
continue
|
||||
|
||||
try:
|
||||
stat = md_file.stat()
|
||||
files.append(
|
||||
{
|
||||
"path": str(md_file),
|
||||
"relative_path": str(relative_path),
|
||||
"name": md_file.stem,
|
||||
"modified_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC),
|
||||
"created_at": datetime.fromtimestamp(stat.st_ctime, tz=UTC),
|
||||
"size": stat.st_size,
|
||||
}
|
||||
)
|
||||
except OSError as e:
|
||||
logger.warning(f"Could not stat file {md_file}: {e}")
|
||||
|
||||
return files
|
||||
|
||||
|
||||
async def index_obsidian_vault(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str | None = None,
|
||||
end_date: str | None = None,
|
||||
update_last_indexed: bool = True,
|
||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||
) -> tuple[int, str | None]:
|
||||
"""
|
||||
Index notes from a local Obsidian vault.
|
||||
|
||||
This indexer is only available in self-hosted mode as it requires
|
||||
direct file system access to the user's Obsidian vault.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the Obsidian connector
|
||||
search_space_id: ID of the search space to store documents in
|
||||
user_id: ID of the user
|
||||
start_date: Start date for filtering (YYYY-MM-DD format) - optional
|
||||
end_date: End date for filtering (YYYY-MM-DD format) - optional
|
||||
update_last_indexed: Whether to update the last_indexed_at timestamp
|
||||
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
|
||||
|
||||
Returns:
|
||||
Tuple containing (number of documents indexed, error message or None)
|
||||
"""
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
# Check if self-hosted mode
|
||||
if not config.is_self_hosted():
|
||||
return 0, "Obsidian connector is only available in self-hosted mode"
|
||||
|
||||
# Log task start
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="obsidian_vault_indexing",
|
||||
source="connector_indexing_task",
|
||||
message=f"Starting Obsidian vault indexing for connector {connector_id}",
|
||||
metadata={
|
||||
"connector_id": connector_id,
|
||||
"user_id": str(user_id),
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
# Get the connector
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Retrieving Obsidian connector {connector_id} from database",
|
||||
{"stage": "connector_retrieval"},
|
||||
)
|
||||
|
||||
connector = await get_connector_by_id(
|
||||
session, connector_id, SearchSourceConnectorType.OBSIDIAN_CONNECTOR
|
||||
)
|
||||
|
||||
if not connector:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Connector with ID {connector_id} not found or is not an Obsidian connector",
|
||||
"Connector not found",
|
||||
{"error_type": "ConnectorNotFound"},
|
||||
)
|
||||
return (
|
||||
0,
|
||||
f"Connector with ID {connector_id} not found or is not an Obsidian connector",
|
||||
)
|
||||
|
||||
# Get vault path from connector config
|
||||
vault_path = connector.config.get("vault_path")
|
||||
if not vault_path:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
"Vault path not configured for this connector",
|
||||
"Missing vault path",
|
||||
{"error_type": "MissingVaultPath"},
|
||||
)
|
||||
return 0, "Vault path not configured for this connector"
|
||||
|
||||
# Validate vault path exists
|
||||
if not os.path.exists(vault_path):
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Vault path does not exist: {vault_path}",
|
||||
"Vault path not found",
|
||||
{"error_type": "VaultNotFound", "vault_path": vault_path},
|
||||
)
|
||||
return 0, f"Vault path does not exist: {vault_path}"
|
||||
|
||||
# Get configuration options
|
||||
exclude_folders = connector.config.get(
|
||||
"exclude_folders", [".trash", ".obsidian", "templates"]
|
||||
)
|
||||
vault_name = connector.config.get("vault_name") or os.path.basename(vault_path)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Scanning Obsidian vault: {vault_name}",
|
||||
{"stage": "vault_scan", "vault_path": vault_path},
|
||||
)
|
||||
|
||||
# Scan vault for markdown files
|
||||
try:
|
||||
files = scan_vault(vault_path, exclude_folders)
|
||||
except Exception as e:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to scan vault: {e}",
|
||||
"Vault scan error",
|
||||
{"error_type": "VaultScanError"},
|
||||
)
|
||||
return 0, f"Failed to scan vault: {e}"
|
||||
|
||||
logger.info(f"Found {len(files)} markdown files in vault")
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Found {len(files)} markdown files to process",
|
||||
{"stage": "files_discovered", "file_count": len(files)},
|
||||
)
|
||||
|
||||
# Filter by date if provided (handle "undefined" string from frontend)
|
||||
# Also handle inverted dates (start > end) by skipping filtering
|
||||
start_dt = None
|
||||
end_dt = None
|
||||
|
||||
if start_date and start_date != "undefined":
|
||||
start_dt = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=UTC)
|
||||
|
||||
if end_date and end_date != "undefined":
|
||||
# Make end_date inclusive (end of day)
|
||||
end_dt = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=UTC)
|
||||
end_dt = end_dt.replace(hour=23, minute=59, second=59)
|
||||
|
||||
# Only apply date filtering if dates are valid and in correct order
|
||||
if start_dt and end_dt and start_dt > end_dt:
|
||||
logger.warning(
|
||||
f"start_date ({start_date}) is after end_date ({end_date}), skipping date filter"
|
||||
)
|
||||
else:
|
||||
if start_dt:
|
||||
files = [f for f in files if f["modified_at"] >= start_dt]
|
||||
logger.info(
|
||||
f"After start_date filter ({start_date}): {len(files)} files"
|
||||
)
|
||||
if end_dt:
|
||||
files = [f for f in files if f["modified_at"] <= end_dt]
|
||||
logger.info(f"After end_date filter ({end_date}): {len(files)} files")
|
||||
|
||||
logger.info(f"Processing {len(files)} files after date filtering")
|
||||
|
||||
indexed_count = 0
|
||||
skipped_count = 0
|
||||
failed_count = 0
|
||||
duplicate_content_count = 0
|
||||
|
||||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||
last_heartbeat_time = time.time()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 1: Analyze all files, create pending documents
|
||||
# This makes ALL documents visible in the UI immediately with pending status
|
||||
# =======================================================================
|
||||
files_to_process = [] # List of dicts with document and file data
|
||||
new_documents_created = False
|
||||
|
||||
for file_info in files:
|
||||
try:
|
||||
file_path = file_info["path"]
|
||||
relative_path = file_info["relative_path"]
|
||||
|
||||
# Read file content
|
||||
try:
|
||||
with open(file_path, encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except UnicodeDecodeError:
|
||||
logger.warning(f"Could not decode file {file_path}, skipping")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
if not content.strip():
|
||||
logger.debug(f"Empty file {file_path}, skipping")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Parse frontmatter and extract metadata
|
||||
frontmatter, body_content = parse_frontmatter(content)
|
||||
wiki_links = extract_wiki_links(content)
|
||||
tags = extract_tags(content)
|
||||
|
||||
# Get title from frontmatter or filename
|
||||
title = file_info["name"]
|
||||
if frontmatter:
|
||||
title = frontmatter.get("title", title)
|
||||
# Also extract tags from frontmatter
|
||||
fm_tags = frontmatter.get("tags", [])
|
||||
if isinstance(fm_tags, list):
|
||||
tags = list({*tags, *fm_tags})
|
||||
elif isinstance(fm_tags, str):
|
||||
tags = list({*tags, fm_tags})
|
||||
|
||||
# Generate unique identifier using vault name and relative path
|
||||
unique_identifier = f"{vault_name}:{relative_path}"
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.OBSIDIAN_CONNECTOR,
|
||||
unique_identifier,
|
||||
search_space_id,
|
||||
)
|
||||
|
||||
# Generate content hash
|
||||
content_hash = generate_content_hash(content, search_space_id)
|
||||
|
||||
# Check for existing document
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
# Ensure status is ready (might have been stuck in processing/pending)
|
||||
if not DocumentStatus.is_state(
|
||||
existing_document.status, DocumentStatus.READY
|
||||
):
|
||||
existing_document.status = DocumentStatus.ready()
|
||||
logger.debug(f"Note {title} unchanged, skipping")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Queue existing document for update (will be set to processing in Phase 2)
|
||||
files_to_process.append(
|
||||
{
|
||||
"document": existing_document,
|
||||
"is_new": False,
|
||||
"file_info": file_info,
|
||||
"content": content,
|
||||
"body_content": body_content,
|
||||
"frontmatter": frontmatter,
|
||||
"wiki_links": wiki_links,
|
||||
"tags": tags,
|
||||
"title": title,
|
||||
"relative_path": relative_path,
|
||||
"content_hash": content_hash,
|
||||
"unique_identifier_hash": unique_identifier_hash,
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
# Document doesn't exist by unique_identifier_hash
|
||||
# Check if a document with the same content_hash exists (from another connector)
|
||||
with session.no_autoflush:
|
||||
duplicate_by_content = await check_duplicate_document_by_hash(
|
||||
session, content_hash
|
||||
)
|
||||
|
||||
if duplicate_by_content:
|
||||
logger.info(
|
||||
f"Obsidian note {title} already indexed by another connector "
|
||||
f"(existing document ID: {duplicate_by_content.id}, "
|
||||
f"type: {duplicate_by_content.document_type}). Skipping."
|
||||
)
|
||||
duplicate_content_count += 1
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Create new document with PENDING status (visible in UI immediately)
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=title,
|
||||
document_type=DocumentType.OBSIDIAN_CONNECTOR,
|
||||
document_metadata={
|
||||
"vault_name": vault_name,
|
||||
"file_path": relative_path,
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
content="Pending...", # Placeholder until processed
|
||||
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=None,
|
||||
chunks=[], # Empty at creation - safe for async
|
||||
status=DocumentStatus.pending(), # Pending until processing starts
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
connector_id=connector_id,
|
||||
)
|
||||
session.add(document)
|
||||
new_documents_created = True
|
||||
|
||||
files_to_process.append(
|
||||
{
|
||||
"document": document,
|
||||
"is_new": True,
|
||||
"file_info": file_info,
|
||||
"content": content,
|
||||
"body_content": body_content,
|
||||
"frontmatter": frontmatter,
|
||||
"wiki_links": wiki_links,
|
||||
"tags": tags,
|
||||
"title": title,
|
||||
"relative_path": relative_path,
|
||||
"content_hash": content_hash,
|
||||
"unique_identifier_hash": unique_identifier_hash,
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
f"Error in Phase 1 for file {file_info.get('path', 'unknown')}: {e}"
|
||||
)
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
# Commit all pending documents - they all appear in UI now
|
||||
if new_documents_created:
|
||||
logger.info(
|
||||
f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 2: Process each document one by one
|
||||
# Each document transitions: pending → processing → ready/failed
|
||||
# =======================================================================
|
||||
logger.info(f"Phase 2: Processing {len(files_to_process)} documents")
|
||||
|
||||
# Get LLM for summarization
|
||||
long_context_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
for item in files_to_process:
|
||||
# Send heartbeat periodically
|
||||
if on_heartbeat_callback:
|
||||
current_time = time.time()
|
||||
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
|
||||
await on_heartbeat_callback(indexed_count)
|
||||
last_heartbeat_time = current_time
|
||||
|
||||
document = item["document"]
|
||||
try:
|
||||
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
|
||||
document.status = DocumentStatus.processing()
|
||||
await session.commit()
|
||||
|
||||
# Extract data from item
|
||||
title = item["title"]
|
||||
relative_path = item["relative_path"]
|
||||
content = item["content"]
|
||||
body_content = item["body_content"]
|
||||
frontmatter = item["frontmatter"]
|
||||
wiki_links = item["wiki_links"]
|
||||
tags = item["tags"]
|
||||
content_hash = item["content_hash"]
|
||||
file_info = item["file_info"]
|
||||
|
||||
# Build metadata
|
||||
document_metadata = {
|
||||
"vault_name": vault_name,
|
||||
"file_path": relative_path,
|
||||
"tags": tags,
|
||||
"outgoing_links": wiki_links,
|
||||
"frontmatter": frontmatter,
|
||||
"modified_at": file_info["modified_at"].isoformat(),
|
||||
"created_at": file_info["created_at"].isoformat(),
|
||||
"word_count": len(body_content.split()),
|
||||
}
|
||||
|
||||
# Build document content with metadata
|
||||
metadata_sections = [
|
||||
(
|
||||
"METADATA",
|
||||
[
|
||||
f"Title: {title}",
|
||||
f"Vault: {vault_name}",
|
||||
f"Path: {relative_path}",
|
||||
f"Tags: {', '.join(tags) if tags else 'None'}",
|
||||
f"Links to: {', '.join(wiki_links) if wiki_links else 'None'}",
|
||||
],
|
||||
),
|
||||
("CONTENT", [body_content]),
|
||||
]
|
||||
document_string = build_document_metadata_string(metadata_sections)
|
||||
|
||||
# Generate summary
|
||||
summary_content = ""
|
||||
if long_context_llm and connector.enable_summary:
|
||||
summary_content, _ = await generate_document_summary(
|
||||
document_string,
|
||||
long_context_llm,
|
||||
document_metadata,
|
||||
)
|
||||
|
||||
# Generate embedding
|
||||
embedding = embed_text(document_string)
|
||||
|
||||
# Add URL and summary to metadata
|
||||
document_metadata["url"] = f"obsidian://{vault_name}/{relative_path}"
|
||||
document_metadata["summary"] = summary_content
|
||||
document_metadata["connector_id"] = connector_id
|
||||
|
||||
# Create chunks
|
||||
chunks = await create_document_chunks(document_string)
|
||||
|
||||
# Update document to READY with actual content
|
||||
document.title = title
|
||||
document.content = document_string
|
||||
document.content_hash = content_hash
|
||||
document.embedding = embedding
|
||||
document.document_metadata = document_metadata
|
||||
await safe_set_chunks(session, document, chunks)
|
||||
document.updated_at = get_current_timestamp()
|
||||
document.status = DocumentStatus.ready()
|
||||
|
||||
indexed_count += 1
|
||||
|
||||
# Batch commit every 10 documents (for ready status updates)
|
||||
if indexed_count % 10 == 0:
|
||||
logger.info(
|
||||
f"Committing batch: {indexed_count} Obsidian notes processed so far"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
f"Error processing file {item.get('file_info', {}).get('path', 'unknown')}: {e}"
|
||||
)
|
||||
# Mark document as failed with reason (visible in UI)
|
||||
try:
|
||||
document.status = DocumentStatus.failed(str(e))
|
||||
document.updated_at = get_current_timestamp()
|
||||
except Exception as status_error:
|
||||
logger.error(
|
||||
f"Failed to update document status to failed: {status_error}"
|
||||
)
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
# Final commit for any remaining documents not yet committed in batches
|
||||
logger.info(f"Final commit: Total {indexed_count} Obsidian notes processed")
|
||||
try:
|
||||
await session.commit()
|
||||
logger.info(
|
||||
"Successfully committed all Obsidian document changes to database"
|
||||
)
|
||||
except Exception as e:
|
||||
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
||||
if (
|
||||
"duplicate key value violates unique constraint" in str(e).lower()
|
||||
or "uniqueviolationerror" in str(e).lower()
|
||||
):
|
||||
logger.warning(
|
||||
f"Duplicate content_hash detected during final commit. "
|
||||
f"This may occur if the same note was indexed by multiple connectors. "
|
||||
f"Rolling back and continuing. Error: {e!s}"
|
||||
)
|
||||
await session.rollback()
|
||||
# Don't fail the entire task - some documents may have been successfully indexed
|
||||
else:
|
||||
raise
|
||||
|
||||
# Build warning message if there were issues
|
||||
warning_parts = []
|
||||
if duplicate_content_count > 0:
|
||||
warning_parts.append(f"{duplicate_content_count} duplicate")
|
||||
if failed_count > 0:
|
||||
warning_parts.append(f"{failed_count} failed")
|
||||
warning_message = ", ".join(warning_parts) if warning_parts else None
|
||||
|
||||
total_processed = indexed_count
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully completed Obsidian vault indexing for connector {connector_id}",
|
||||
{
|
||||
"notes_processed": total_processed,
|
||||
"documents_indexed": indexed_count,
|
||||
"documents_skipped": skipped_count,
|
||||
"documents_failed": failed_count,
|
||||
"duplicate_content_count": duplicate_content_count,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Obsidian vault indexing completed: {indexed_count} ready, "
|
||||
f"{skipped_count} skipped, {failed_count} failed "
|
||||
f"({duplicate_content_count} duplicate content)"
|
||||
)
|
||||
return total_processed, warning_message
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.exception(f"Database error during Obsidian indexing: {e}")
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Database error during Obsidian indexing: {e}",
|
||||
"Database error",
|
||||
{"error_type": "SQLAlchemyError"},
|
||||
)
|
||||
return 0, f"Database error: {e}"
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error during Obsidian indexing: {e}")
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Error during Obsidian indexing: {e}",
|
||||
"Unexpected error",
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
return 0, str(e)
|
||||
|
|
@ -24,7 +24,6 @@ CONNECTOR_TASK_MAP = {
|
|||
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: "index_elasticsearch_documents",
|
||||
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: "index_crawled_urls",
|
||||
SearchSourceConnectorType.BOOKSTACK_CONNECTOR: "index_bookstack_pages",
|
||||
SearchSourceConnectorType.OBSIDIAN_CONNECTOR: "index_obsidian_vault",
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -81,7 +80,6 @@ def create_periodic_schedule(
|
|||
index_elasticsearch_documents_task,
|
||||
index_github_repos_task,
|
||||
index_notion_pages_task,
|
||||
index_obsidian_vault_task,
|
||||
)
|
||||
|
||||
task_map = {
|
||||
|
|
@ -91,7 +89,6 @@ def create_periodic_schedule(
|
|||
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
|
||||
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
|
||||
SearchSourceConnectorType.BOOKSTACK_CONNECTOR: index_bookstack_pages_task,
|
||||
SearchSourceConnectorType.OBSIDIAN_CONNECTOR: index_obsidian_vault_task,
|
||||
}
|
||||
|
||||
# Trigger the first run immediately
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue