diff --git a/surfsense_backend/alembic/versions/117_add_local_folder_connector_and_versioning.py b/surfsense_backend/alembic/versions/117_add_local_folder_connector_and_versioning.py new file mode 100644 index 000000000..e97a4787c --- /dev/null +++ b/surfsense_backend/alembic/versions/117_add_local_folder_connector_and_versioning.py @@ -0,0 +1,135 @@ +"""Add local folder connector enums and document_versions table + +Revision ID: 117 +Revises: 116 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +revision: str = "117" +down_revision: str | None = "116" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +PUBLICATION_NAME = "zero_publication" + + +def upgrade() -> None: + conn = op.get_bind() + + # Add LOCAL_FOLDER_CONNECTOR to searchsourceconnectortype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'LOCAL_FOLDER_CONNECTOR' + ) THEN + ALTER TYPE searchsourceconnectortype ADD VALUE 'LOCAL_FOLDER_CONNECTOR'; + END IF; + END + $$; + """ + ) + + # Add LOCAL_FOLDER_FILE to documenttype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'LOCAL_FOLDER_FILE' + ) THEN + ALTER TYPE documenttype ADD VALUE 'LOCAL_FOLDER_FILE'; + END IF; + END + $$; + """ + ) + + # Create document_versions table + table_exists = conn.execute( + sa.text( + "SELECT 1 FROM information_schema.tables WHERE table_name = 'document_versions'" + ) + ).fetchone() + if not table_exists: + op.create_table( + "document_versions", + sa.Column("id", sa.Integer(), nullable=False, autoincrement=True), + sa.Column("document_id", sa.Integer(), nullable=False), + sa.Column("version_number", sa.Integer(), nullable=False), + sa.Column("source_markdown", sa.Text(), nullable=True), + sa.Column("content_hash", sa.String(), nullable=False), + sa.Column("title", sa.String(), nullable=True), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.ForeignKeyConstraint( + ["document_id"], + ["documents.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "document_id", + "version_number", + name="uq_document_version", + ), + ) + + op.execute( + "CREATE INDEX IF NOT EXISTS ix_document_versions_document_id " + "ON document_versions (document_id)" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_document_versions_created_at " + "ON document_versions (created_at)" + ) + + # Add document_versions to Zero publication + pub_exists = conn.execute( + sa.text("SELECT 1 FROM pg_publication WHERE pubname = :name"), + {"name": PUBLICATION_NAME}, + ).fetchone() + if pub_exists: + already_in_pub = conn.execute( + sa.text( + "SELECT 1 FROM pg_publication_tables " + "WHERE pubname = :name AND tablename = 'document_versions'" + ), + {"name": PUBLICATION_NAME}, + ).fetchone() + if not already_in_pub: + op.execute( + f"ALTER PUBLICATION {PUBLICATION_NAME} ADD TABLE document_versions" + ) + + +def downgrade() -> None: + conn = op.get_bind() + + # Remove from publication + pub_exists = conn.execute( + sa.text("SELECT 1 FROM pg_publication WHERE pubname = :name"), + {"name": PUBLICATION_NAME}, + ).fetchone() + if pub_exists: + op.execute( + f"ALTER PUBLICATION {PUBLICATION_NAME} DROP TABLE IF EXISTS document_versions" + ) + + op.execute("DROP INDEX IF EXISTS ix_document_versions_created_at") + op.execute("DROP INDEX IF EXISTS ix_document_versions_document_id") + op.execute("DROP TABLE IF EXISTS document_versions") diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 90630cc83..25045e84a 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -64,6 +64,7 @@ class DocumentType(StrEnum): COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR" COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR" + LOCAL_FOLDER_FILE = "LOCAL_FOLDER_FILE" # Native Google document types → their legacy Composio equivalents. @@ -109,6 +110,7 @@ class SearchSourceConnectorType(StrEnum): COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR" COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR" + LOCAL_FOLDER_CONNECTOR = "LOCAL_FOLDER_CONNECTOR" class PodcastStatus(StrEnum): @@ -1039,6 +1041,26 @@ class Document(BaseModel, TimestampMixin): ) +class DocumentVersion(BaseModel, TimestampMixin): + __tablename__ = "document_versions" + __table_args__ = ( + UniqueConstraint("document_id", "version_number", name="uq_document_version"), + ) + + document_id = Column( + Integer, + ForeignKey("documents.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + version_number = Column(Integer, nullable=False) + source_markdown = Column(Text, nullable=True) + content_hash = Column(String, nullable=False) + title = Column(String, nullable=True) + + document = relationship("Document", backref="versions") + + class Chunk(BaseModel, TimestampMixin): __tablename__ = "chunks" diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 6e69218f1..9271d4630 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -10,6 +10,7 @@ from app.db import ( Chunk, Document, DocumentType, + DocumentVersion, Permission, SearchSpace, SearchSpaceMembership, @@ -1135,3 +1136,125 @@ async def delete_document( raise HTTPException( status_code=500, detail=f"Failed to delete document: {e!s}" ) from e + + +# ==================================================================== +# Version History Endpoints +# ==================================================================== + + +@router.get("/documents/{document_id}/versions") +async def list_document_versions( + document_id: int, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """List all versions for a document, ordered by version_number descending.""" + document = ( + await session.execute(select(Document).where(Document.id == document_id)) + ).scalar_one_or_none() + if not document: + raise HTTPException(status_code=404, detail="Document not found") + + await check_permission(session, user, document.search_space_id, Permission.READ) + + versions = ( + await session.execute( + select(DocumentVersion) + .where(DocumentVersion.document_id == document_id) + .order_by(DocumentVersion.version_number.desc()) + ) + ).scalars().all() + + return [ + { + "version_number": v.version_number, + "title": v.title, + "content_hash": v.content_hash, + "created_at": v.created_at.isoformat() if v.created_at else None, + } + for v in versions + ] + + +@router.get("/documents/{document_id}/versions/{version_number}") +async def get_document_version( + document_id: int, + version_number: int, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Get full version content including source_markdown.""" + document = ( + await session.execute(select(Document).where(Document.id == document_id)) + ).scalar_one_or_none() + if not document: + raise HTTPException(status_code=404, detail="Document not found") + + await check_permission(session, user, document.search_space_id, Permission.READ) + + version = ( + await session.execute( + select(DocumentVersion).where( + DocumentVersion.document_id == document_id, + DocumentVersion.version_number == version_number, + ) + ) + ).scalar_one_or_none() + if not version: + raise HTTPException(status_code=404, detail="Version not found") + + return { + "version_number": version.version_number, + "title": version.title, + "content_hash": version.content_hash, + "source_markdown": version.source_markdown, + "created_at": version.created_at.isoformat() if version.created_at else None, + } + + +@router.post("/documents/{document_id}/versions/{version_number}/restore") +async def restore_document_version( + document_id: int, + version_number: int, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Restore a previous version: snapshot current state, then overwrite document content.""" + document = ( + await session.execute( + select(Document).where(Document.id == document_id) + ) + ).scalar_one_or_none() + if not document: + raise HTTPException(status_code=404, detail="Document not found") + + await check_permission(session, user, document.search_space_id, Permission.WRITE) + + version = ( + await session.execute( + select(DocumentVersion).where( + DocumentVersion.document_id == document_id, + DocumentVersion.version_number == version_number, + ) + ) + ).scalar_one_or_none() + if not version: + raise HTTPException(status_code=404, detail="Version not found") + + # Snapshot current state before restoring + from app.utils.document_versioning import create_version_snapshot + + await create_version_snapshot(session, document) + + # Restore the version's content onto the document + document.source_markdown = version.source_markdown + document.title = version.title or document.title + document.content_needs_reindexing = True + await session.commit() + + return { + "message": f"Restored version {version_number}", + "document_id": document_id, + "restored_version": version_number, + } diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index b73b8c789..439d83ac1 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -20,6 +20,7 @@ Non-OAuth connectors (BookStack, GitHub, etc.) are limited to one per search spa import asyncio import logging +import os from contextlib import suppress from datetime import UTC, datetime, timedelta from typing import Any @@ -55,23 +56,12 @@ from app.schemas import ( ) from app.services.composio_service import ComposioService, get_composio_service from app.services.notification_service import NotificationService -from app.tasks.connector_indexers import ( - index_airtable_records, - index_clickup_tasks, - index_confluence_pages, - index_crawled_urls, - index_discord_messages, - index_elasticsearch_documents, - index_github_repos, - index_google_calendar_events, - index_google_gmail_messages, - index_jira_issues, - index_linear_issues, - index_luma_events, - index_notion_pages, - index_slack_messages, -) from app.users import current_active_user + +# NOTE: connector indexer functions are imported lazily inside each +# ``run_*_indexing`` helper to break a circular import cycle: +# connector_indexers.__init__ → airtable_indexer → airtable_history +# → app.routes.__init__ → this file → connector_indexers (not ready yet) from app.utils.connector_naming import ensure_unique_connector_name from app.utils.indexing_locks import ( acquire_connector_indexing_lock, @@ -1180,6 +1170,24 @@ async def index_connector_content( ) response_message = "Obsidian vault indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR: + from app.config import config as app_config + from app.tasks.celery_tasks.connector_tasks import index_local_folder_task + + if not app_config.is_self_hosted(): + raise HTTPException( + status_code=400, + detail="Local folder connector is only available in self-hosted mode", + ) + + logger.info( + f"Triggering local folder indexing for connector {connector_id} into search space {search_space_id}" + ) + index_local_folder_task.delay( + connector_id, search_space_id, str(user.id), indexing_from, indexing_to + ) + response_message = "Local folder indexing started in the background." + elif ( connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR @@ -1312,6 +1320,76 @@ async def index_connector_content( ) from e +class IndexFileRequest(BaseModel): + file_path: str = Field(..., description="Absolute path to the file to index") + + +@router.post( + "/search-source-connectors/{connector_id}/index-file", + response_model=dict[str, Any], +) +async def index_single_file( + connector_id: int, + body: IndexFileRequest, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Index a single file from a local folder connector (chokidar real-time trigger).""" + from app.config import config as app_config + from app.tasks.celery_tasks.connector_tasks import index_local_folder_task + + if not app_config.is_self_hosted(): + raise HTTPException( + status_code=400, + detail="Local folder connector is only available in self-hosted mode", + ) + + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + raise HTTPException(status_code=404, detail="Local folder connector not found") + + await check_permission(session, user, connector.search_space_id, Permission.WRITE) + + folder_path = connector.config.get("folder_path", "") + + # Security: resolve symlinks and verify the file is inside folder_path + try: + resolved_file = os.path.realpath(body.file_path) + resolved_folder = os.path.realpath(folder_path) + if not resolved_file.startswith(resolved_folder + os.sep) and resolved_file != resolved_folder: + raise HTTPException( + status_code=403, + detail="File path is outside the configured folder", + ) + except (OSError, ValueError): + raise HTTPException( + status_code=403, + detail="Invalid file path", + ) + + index_local_folder_task.delay( + connector_id, + connector.search_space_id, + str(user.id), + None, + None, + target_file_path=resolved_file, + ) + + return { + "message": "Single file indexing started", + "connector_id": connector_id, + "file_path": body.file_path, + } + + async def _update_connector_timestamp_by_id(session: AsyncSession, connector_id: int): """ Update the last_indexed_at timestamp for a connector by its ID. @@ -1378,6 +1456,8 @@ async def run_slack_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_slack_messages + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -1824,6 +1904,8 @@ async def run_notion_indexing_with_new_session( Create a new session and run the Notion indexing task. This prevents session leaks by creating a dedicated session for the background task. """ + from app.tasks.connector_indexers import index_notion_pages + async with async_session_maker() as session: await _run_indexing_with_notifications( session=session, @@ -1858,6 +1940,8 @@ async def run_notion_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_notion_pages + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -1910,6 +1994,8 @@ async def run_github_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_github_repos + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -1961,6 +2047,8 @@ async def run_linear_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_linear_issues + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2011,6 +2099,8 @@ async def run_discord_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_discord_messages + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2113,6 +2203,8 @@ async def run_jira_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_jira_issues + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2166,6 +2258,8 @@ async def run_confluence_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_confluence_pages + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2217,6 +2311,8 @@ async def run_clickup_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_clickup_tasks + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2268,6 +2364,8 @@ async def run_airtable_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_airtable_records + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2321,6 +2419,8 @@ async def run_google_calendar_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_google_calendar_events + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2370,6 +2470,7 @@ async def run_google_gmail_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_google_gmail_messages # Create a wrapper function that calls index_google_gmail_messages with max_messages async def gmail_indexing_wrapper( @@ -2836,6 +2937,8 @@ async def run_luma_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_luma_events + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2888,6 +2991,8 @@ async def run_elasticsearch_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_elasticsearch_documents + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -2938,6 +3043,8 @@ async def run_web_page_indexing( start_date: Start date for indexing end_date: End date for indexing """ + from app.tasks.connector_indexers import index_crawled_urls + await _run_indexing_with_notifications( session=session, connector_id=connector_id, @@ -3059,6 +3166,62 @@ async def run_obsidian_indexing( ) +async def run_local_folder_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, + target_file_path: str | None = None, +): + """Wrapper to run local folder indexing with its own database session.""" + logger.info( + f"Background task started: Indexing local folder connector {connector_id} into space {search_space_id}" + ) + async with async_session_maker() as session: + await run_local_folder_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date, + target_file_path=target_file_path, + ) + logger.info(f"Background task finished: Indexing local folder connector {connector_id}") + + +async def run_local_folder_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, + target_file_path: str | None = None, +): + """Background task to run local folder indexing.""" + from app.tasks.connector_indexers import index_local_folder + + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=lambda session, connector_id, search_space_id, user_id, + start_date, end_date, update_last_indexed, on_heartbeat_callback: index_local_folder( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + update_last_indexed=update_last_indexed, + on_heartbeat_callback=on_heartbeat_callback, + target_file_path=target_file_path, + ), + update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, + ) + + async def run_composio_indexing_with_new_session( connector_id: int, search_space_id: int,