feat: add local folder connector and document versioning functionality

This commit is contained in:
Anish Sarkar 2026-04-02 10:35:32 +05:30
parent c9ae4fcaca
commit 3b92e99d28
4 changed files with 459 additions and 16 deletions

View file

@ -0,0 +1,135 @@
"""Add local folder connector enums and document_versions table
Revision ID: 117
Revises: 116
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "117"
down_revision: str | None = "116"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
PUBLICATION_NAME = "zero_publication"
def upgrade() -> None:
conn = op.get_bind()
# Add LOCAL_FOLDER_CONNECTOR to searchsourceconnectortype enum
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'LOCAL_FOLDER_CONNECTOR'
) THEN
ALTER TYPE searchsourceconnectortype ADD VALUE 'LOCAL_FOLDER_CONNECTOR';
END IF;
END
$$;
"""
)
# Add LOCAL_FOLDER_FILE to documenttype enum
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'documenttype' AND e.enumlabel = 'LOCAL_FOLDER_FILE'
) THEN
ALTER TYPE documenttype ADD VALUE 'LOCAL_FOLDER_FILE';
END IF;
END
$$;
"""
)
# Create document_versions table
table_exists = conn.execute(
sa.text(
"SELECT 1 FROM information_schema.tables WHERE table_name = 'document_versions'"
)
).fetchone()
if not table_exists:
op.create_table(
"document_versions",
sa.Column("id", sa.Integer(), nullable=False, autoincrement=True),
sa.Column("document_id", sa.Integer(), nullable=False),
sa.Column("version_number", sa.Integer(), nullable=False),
sa.Column("source_markdown", sa.Text(), nullable=True),
sa.Column("content_hash", sa.String(), nullable=False),
sa.Column("title", sa.String(), nullable=True),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["document_id"],
["documents.id"],
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"document_id",
"version_number",
name="uq_document_version",
),
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_document_versions_document_id "
"ON document_versions (document_id)"
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_document_versions_created_at "
"ON document_versions (created_at)"
)
# Add document_versions to Zero publication
pub_exists = conn.execute(
sa.text("SELECT 1 FROM pg_publication WHERE pubname = :name"),
{"name": PUBLICATION_NAME},
).fetchone()
if pub_exists:
already_in_pub = conn.execute(
sa.text(
"SELECT 1 FROM pg_publication_tables "
"WHERE pubname = :name AND tablename = 'document_versions'"
),
{"name": PUBLICATION_NAME},
).fetchone()
if not already_in_pub:
op.execute(
f"ALTER PUBLICATION {PUBLICATION_NAME} ADD TABLE document_versions"
)
def downgrade() -> None:
conn = op.get_bind()
# Remove from publication
pub_exists = conn.execute(
sa.text("SELECT 1 FROM pg_publication WHERE pubname = :name"),
{"name": PUBLICATION_NAME},
).fetchone()
if pub_exists:
op.execute(
f"ALTER PUBLICATION {PUBLICATION_NAME} DROP TABLE IF EXISTS document_versions"
)
op.execute("DROP INDEX IF EXISTS ix_document_versions_created_at")
op.execute("DROP INDEX IF EXISTS ix_document_versions_document_id")
op.execute("DROP TABLE IF EXISTS document_versions")

View file

@ -64,6 +64,7 @@ class DocumentType(StrEnum):
COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"
COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR"
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"
LOCAL_FOLDER_FILE = "LOCAL_FOLDER_FILE"
# Native Google document types → their legacy Composio equivalents.
@ -109,6 +110,7 @@ class SearchSourceConnectorType(StrEnum):
COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"
COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR"
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"
LOCAL_FOLDER_CONNECTOR = "LOCAL_FOLDER_CONNECTOR"
class PodcastStatus(StrEnum):
@ -1039,6 +1041,26 @@ class Document(BaseModel, TimestampMixin):
)
class DocumentVersion(BaseModel, TimestampMixin):
__tablename__ = "document_versions"
__table_args__ = (
UniqueConstraint("document_id", "version_number", name="uq_document_version"),
)
document_id = Column(
Integer,
ForeignKey("documents.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
version_number = Column(Integer, nullable=False)
source_markdown = Column(Text, nullable=True)
content_hash = Column(String, nullable=False)
title = Column(String, nullable=True)
document = relationship("Document", backref="versions")
class Chunk(BaseModel, TimestampMixin):
__tablename__ = "chunks"

View file

@ -10,6 +10,7 @@ from app.db import (
Chunk,
Document,
DocumentType,
DocumentVersion,
Permission,
SearchSpace,
SearchSpaceMembership,
@ -1135,3 +1136,125 @@ async def delete_document(
raise HTTPException(
status_code=500, detail=f"Failed to delete document: {e!s}"
) from e
# ====================================================================
# Version History Endpoints
# ====================================================================
@router.get("/documents/{document_id}/versions")
async def list_document_versions(
document_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""List all versions for a document, ordered by version_number descending."""
document = (
await session.execute(select(Document).where(Document.id == document_id))
).scalar_one_or_none()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
await check_permission(session, user, document.search_space_id, Permission.READ)
versions = (
await session.execute(
select(DocumentVersion)
.where(DocumentVersion.document_id == document_id)
.order_by(DocumentVersion.version_number.desc())
)
).scalars().all()
return [
{
"version_number": v.version_number,
"title": v.title,
"content_hash": v.content_hash,
"created_at": v.created_at.isoformat() if v.created_at else None,
}
for v in versions
]
@router.get("/documents/{document_id}/versions/{version_number}")
async def get_document_version(
document_id: int,
version_number: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Get full version content including source_markdown."""
document = (
await session.execute(select(Document).where(Document.id == document_id))
).scalar_one_or_none()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
await check_permission(session, user, document.search_space_id, Permission.READ)
version = (
await session.execute(
select(DocumentVersion).where(
DocumentVersion.document_id == document_id,
DocumentVersion.version_number == version_number,
)
)
).scalar_one_or_none()
if not version:
raise HTTPException(status_code=404, detail="Version not found")
return {
"version_number": version.version_number,
"title": version.title,
"content_hash": version.content_hash,
"source_markdown": version.source_markdown,
"created_at": version.created_at.isoformat() if version.created_at else None,
}
@router.post("/documents/{document_id}/versions/{version_number}/restore")
async def restore_document_version(
document_id: int,
version_number: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Restore a previous version: snapshot current state, then overwrite document content."""
document = (
await session.execute(
select(Document).where(Document.id == document_id)
)
).scalar_one_or_none()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
await check_permission(session, user, document.search_space_id, Permission.WRITE)
version = (
await session.execute(
select(DocumentVersion).where(
DocumentVersion.document_id == document_id,
DocumentVersion.version_number == version_number,
)
)
).scalar_one_or_none()
if not version:
raise HTTPException(status_code=404, detail="Version not found")
# Snapshot current state before restoring
from app.utils.document_versioning import create_version_snapshot
await create_version_snapshot(session, document)
# Restore the version's content onto the document
document.source_markdown = version.source_markdown
document.title = version.title or document.title
document.content_needs_reindexing = True
await session.commit()
return {
"message": f"Restored version {version_number}",
"document_id": document_id,
"restored_version": version_number,
}

View file

@ -20,6 +20,7 @@ Non-OAuth connectors (BookStack, GitHub, etc.) are limited to one per search spa
import asyncio
import logging
import os
from contextlib import suppress
from datetime import UTC, datetime, timedelta
from typing import Any
@ -55,23 +56,12 @@ from app.schemas import (
)
from app.services.composio_service import ComposioService, get_composio_service
from app.services.notification_service import NotificationService
from app.tasks.connector_indexers import (
index_airtable_records,
index_clickup_tasks,
index_confluence_pages,
index_crawled_urls,
index_discord_messages,
index_elasticsearch_documents,
index_github_repos,
index_google_calendar_events,
index_google_gmail_messages,
index_jira_issues,
index_linear_issues,
index_luma_events,
index_notion_pages,
index_slack_messages,
)
from app.users import current_active_user
# NOTE: connector indexer functions are imported lazily inside each
# ``run_*_indexing`` helper to break a circular import cycle:
# connector_indexers.__init__ → airtable_indexer → airtable_history
# → app.routes.__init__ → this file → connector_indexers (not ready yet)
from app.utils.connector_naming import ensure_unique_connector_name
from app.utils.indexing_locks import (
acquire_connector_indexing_lock,
@ -1180,6 +1170,24 @@ async def index_connector_content(
)
response_message = "Obsidian vault indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR:
from app.config import config as app_config
from app.tasks.celery_tasks.connector_tasks import index_local_folder_task
if not app_config.is_self_hosted():
raise HTTPException(
status_code=400,
detail="Local folder connector is only available in self-hosted mode",
)
logger.info(
f"Triggering local folder indexing for connector {connector_id} into search space {search_space_id}"
)
index_local_folder_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = "Local folder indexing started in the background."
elif (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
@ -1312,6 +1320,76 @@ async def index_connector_content(
) from e
class IndexFileRequest(BaseModel):
file_path: str = Field(..., description="Absolute path to the file to index")
@router.post(
"/search-source-connectors/{connector_id}/index-file",
response_model=dict[str, Any],
)
async def index_single_file(
connector_id: int,
body: IndexFileRequest,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Index a single file from a local folder connector (chokidar real-time trigger)."""
from app.config import config as app_config
from app.tasks.celery_tasks.connector_tasks import index_local_folder_task
if not app_config.is_self_hosted():
raise HTTPException(
status_code=400,
detail="Local folder connector is only available in self-hosted mode",
)
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(status_code=404, detail="Local folder connector not found")
await check_permission(session, user, connector.search_space_id, Permission.WRITE)
folder_path = connector.config.get("folder_path", "")
# Security: resolve symlinks and verify the file is inside folder_path
try:
resolved_file = os.path.realpath(body.file_path)
resolved_folder = os.path.realpath(folder_path)
if not resolved_file.startswith(resolved_folder + os.sep) and resolved_file != resolved_folder:
raise HTTPException(
status_code=403,
detail="File path is outside the configured folder",
)
except (OSError, ValueError):
raise HTTPException(
status_code=403,
detail="Invalid file path",
)
index_local_folder_task.delay(
connector_id,
connector.search_space_id,
str(user.id),
None,
None,
target_file_path=resolved_file,
)
return {
"message": "Single file indexing started",
"connector_id": connector_id,
"file_path": body.file_path,
}
async def _update_connector_timestamp_by_id(session: AsyncSession, connector_id: int):
"""
Update the last_indexed_at timestamp for a connector by its ID.
@ -1378,6 +1456,8 @@ async def run_slack_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_slack_messages
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -1824,6 +1904,8 @@ async def run_notion_indexing_with_new_session(
Create a new session and run the Notion indexing task.
This prevents session leaks by creating a dedicated session for the background task.
"""
from app.tasks.connector_indexers import index_notion_pages
async with async_session_maker() as session:
await _run_indexing_with_notifications(
session=session,
@ -1858,6 +1940,8 @@ async def run_notion_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_notion_pages
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -1910,6 +1994,8 @@ async def run_github_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_github_repos
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -1961,6 +2047,8 @@ async def run_linear_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_linear_issues
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2011,6 +2099,8 @@ async def run_discord_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_discord_messages
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2113,6 +2203,8 @@ async def run_jira_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_jira_issues
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2166,6 +2258,8 @@ async def run_confluence_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_confluence_pages
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2217,6 +2311,8 @@ async def run_clickup_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_clickup_tasks
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2268,6 +2364,8 @@ async def run_airtable_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_airtable_records
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2321,6 +2419,8 @@ async def run_google_calendar_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_google_calendar_events
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2370,6 +2470,7 @@ async def run_google_gmail_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_google_gmail_messages
# Create a wrapper function that calls index_google_gmail_messages with max_messages
async def gmail_indexing_wrapper(
@ -2836,6 +2937,8 @@ async def run_luma_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_luma_events
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2888,6 +2991,8 @@ async def run_elasticsearch_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_elasticsearch_documents
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -2938,6 +3043,8 @@ async def run_web_page_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.connector_indexers import index_crawled_urls
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
@ -3059,6 +3166,62 @@ async def run_obsidian_indexing(
)
async def run_local_folder_indexing_with_new_session(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
target_file_path: str | None = None,
):
"""Wrapper to run local folder indexing with its own database session."""
logger.info(
f"Background task started: Indexing local folder connector {connector_id} into space {search_space_id}"
)
async with async_session_maker() as session:
await run_local_folder_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date,
target_file_path=target_file_path,
)
logger.info(f"Background task finished: Indexing local folder connector {connector_id}")
async def run_local_folder_indexing(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
target_file_path: str | None = None,
):
"""Background task to run local folder indexing."""
from app.tasks.connector_indexers import index_local_folder
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=lambda session, connector_id, search_space_id, user_id,
start_date, end_date, update_last_indexed, on_heartbeat_callback: index_local_folder(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=update_last_indexed,
on_heartbeat_callback=on_heartbeat_callback,
target_file_path=target_file_path,
),
update_timestamp_func=_update_connector_timestamp_by_id,
supports_heartbeat_callback=True,
)
async def run_composio_indexing_with_new_session(
connector_id: int,
search_space_id: int,