diff --git a/surfsense_backend/alembic/versions/87_add_document_connector_id.py b/surfsense_backend/alembic/versions/87_add_document_connector_id.py new file mode 100644 index 000000000..b075014ae --- /dev/null +++ b/surfsense_backend/alembic/versions/87_add_document_connector_id.py @@ -0,0 +1,171 @@ +"""Add connector_id column to documents table for linking documents to their source connector + +Revision ID: 87 +Revises: 86 +Create Date: 2026-02-02 + +Changes: +1. Add connector_id column (Integer, nullable, foreign key to search_source_connectors.id) +2. Create index on connector_id for efficient bulk deletion queries +3. SET NULL on delete - allows controlled cleanup in application code +4. Backfill existing documents based on document_type and search_space_id matching +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "87" +down_revision: str | None = "86" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add connector_id column to documents and backfill from existing connectors.""" + + # 1. Add connector_id column (nullable - for manually uploaded docs without connector) + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'connector_id' + ) THEN + ALTER TABLE documents + ADD COLUMN connector_id INTEGER; + END IF; + END$$; + """ + ) + + # 2. Create index on connector_id for efficient cleanup queries + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_documents_connector_id + ON documents (connector_id); + """ + ) + + # 3. Add foreign key constraint with ON DELETE SET NULL + # SET NULL allows us to delete documents in controlled batches before deleting connector + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_connector_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + ADD CONSTRAINT fk_documents_connector_id + FOREIGN KEY (connector_id) REFERENCES search_source_connectors(id) + ON DELETE SET NULL; + END IF; + END$$; + """ + ) + + # 4. Backfill existing documents with connector_id based on document_type matching + # This maps document types to their corresponding connector types + # Only backfills for documents in search spaces that have exactly one connector of that type + + # Map of document_type -> connector_type for backfilling + document_connector_mappings = [ + ("NOTION_CONNECTOR", "NOTION_CONNECTOR"), + ("SLACK_CONNECTOR", "SLACK_CONNECTOR"), + ("TEAMS_CONNECTOR", "TEAMS_CONNECTOR"), + ("GITHUB_CONNECTOR", "GITHUB_CONNECTOR"), + ("LINEAR_CONNECTOR", "LINEAR_CONNECTOR"), + ("DISCORD_CONNECTOR", "DISCORD_CONNECTOR"), + ("JIRA_CONNECTOR", "JIRA_CONNECTOR"), + ("CONFLUENCE_CONNECTOR", "CONFLUENCE_CONNECTOR"), + ("CLICKUP_CONNECTOR", "CLICKUP_CONNECTOR"), + ("GOOGLE_CALENDAR_CONNECTOR", "GOOGLE_CALENDAR_CONNECTOR"), + ("GOOGLE_GMAIL_CONNECTOR", "GOOGLE_GMAIL_CONNECTOR"), + ("GOOGLE_DRIVE_FILE", "GOOGLE_DRIVE_CONNECTOR"), + ("AIRTABLE_CONNECTOR", "AIRTABLE_CONNECTOR"), + ("LUMA_CONNECTOR", "LUMA_CONNECTOR"), + ("ELASTICSEARCH_CONNECTOR", "ELASTICSEARCH_CONNECTOR"), + ("BOOKSTACK_CONNECTOR", "BOOKSTACK_CONNECTOR"), + ("CIRCLEBACK", "CIRCLEBACK_CONNECTOR"), + ("OBSIDIAN_CONNECTOR", "OBSIDIAN_CONNECTOR"), + ("COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"), + ("COMPOSIO_GMAIL_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR"), + ("COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"), + ("CRAWLED_URL", "WEBCRAWLER_CONNECTOR"), + ] + + for doc_type, connector_type in document_connector_mappings: + # Backfill connector_id for documents where: + # 1. Document has this document_type + # 2. Document doesn't already have a connector_id + # 3. There's exactly one connector of this type in the same search space + # This safely handles most cases while avoiding ambiguity + op.execute( + f""" + UPDATE documents d + SET connector_id = ( + SELECT ssc.id + FROM search_source_connectors ssc + WHERE ssc.search_space_id = d.search_space_id + AND ssc.connector_type = '{connector_type}' + LIMIT 1 + ) + WHERE d.document_type = '{doc_type}' + AND d.connector_id IS NULL + AND EXISTS ( + SELECT 1 FROM search_source_connectors ssc + WHERE ssc.search_space_id = d.search_space_id + AND ssc.connector_type = '{connector_type}' + ); + """ + ) + + +def downgrade() -> None: + """Remove connector_id column from documents.""" + + # Drop foreign key constraint + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_connector_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + DROP CONSTRAINT fk_documents_connector_id; + END IF; + END$$; + """ + ) + + # Drop index + op.execute( + """ + DROP INDEX IF EXISTS ix_documents_connector_id; + """ + ) + + # Drop column + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'connector_id' + ) THEN + ALTER TABLE documents + DROP COLUMN connector_id; + END IF; + END$$; + """ + ) + diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index b77f5698e..fc13fd3eb 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -80,6 +80,7 @@ celery_app = Celery( "app.tasks.celery_tasks.blocknote_migration_tasks", "app.tasks.celery_tasks.document_reindex_tasks", "app.tasks.celery_tasks.stale_notification_cleanup_task", + "app.tasks.celery_tasks.connector_deletion_task", ], ) diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py index 2b470ae38..1964a4d45 100644 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -395,6 +395,7 @@ async def _process_gmail_message_batch( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py index 960757901..78ff360ca 100644 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -443,6 +443,7 @@ async def index_composio_google_calendar( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index b4b3e7ee6..369f5d8b3 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -1248,7 +1248,6 @@ async def _process_single_drive_file( "file_name": file_name, "FILE_NAME": file_name, # For compatibility "mime_type": mime_type, - "connector_id": connector_id, "toolkit_id": "googledrive", "source": "composio", }, @@ -1259,6 +1258,7 @@ async def _process_single_drive_file( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 61b427970..39a92f95f 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -25,6 +25,7 @@ async def download_and_process_file( session: AsyncSession, task_logger: TaskLoggingService, log_entry: Log, + connector_id: int | None = None, ) -> tuple[Any, str | None, dict[str, Any] | None]: """ Download Google Drive file and process using Surfsense file processors. @@ -37,6 +38,7 @@ async def download_and_process_file( session: Database session task_logger: Task logging service log_entry: Log entry for tracking + connector_id: ID of the connector (for de-indexing support) Returns: Tuple of (Document object if successful, error message if failed, file metadata dict) @@ -92,6 +94,9 @@ async def download_and_process_file( "source_connector": "google_drive", }, } + # Include connector_id for de-indexing support + if connector_id is not None: + connector_info["connector_id"] = connector_id # Add additional Drive metadata if available if "modifiedTime" in file: diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 615a7ddb9..4148b8e38 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -760,9 +760,18 @@ class Document(BaseModel, TimestampMixin): index=True, ) + # Track which connector created this document (for cleanup on connector deletion) + connector_id = Column( + Integer, + ForeignKey("search_source_connectors.id", ondelete="SET NULL"), + nullable=True, # Nullable for manually uploaded docs without connector + index=True, + ) + # Relationships search_space = relationship("SearchSpace", back_populates="documents") created_by = relationship("User", back_populates="documents") + connector = relationship("SearchSourceConnector", back_populates="documents") chunks = relationship( "Chunk", back_populates="document", cascade="all, delete-orphan" ) @@ -991,6 +1000,9 @@ class SearchSourceConnector(BaseModel, TimestampMixin): UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False ) + # Documents created by this connector (for cleanup on connector deletion) + documents = relationship("Document", back_populates="connector") + class NewLLMConfig(BaseModel, TimestampMixin): """ diff --git a/surfsense_backend/app/routes/circleback_webhook_route.py b/surfsense_backend/app/routes/circleback_webhook_route.py index 1285aadeb..4a5823645 100644 --- a/surfsense_backend/app/routes/circleback_webhook_route.py +++ b/surfsense_backend/app/routes/circleback_webhook_route.py @@ -9,8 +9,12 @@ import logging from datetime import datetime from typing import Any -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import SearchSourceConnector, SearchSourceConnectorType, get_async_session logger = logging.getLogger(__name__) @@ -212,6 +216,7 @@ def format_circleback_meeting_to_markdown(payload: CirclebackWebhookPayload) -> async def receive_circleback_webhook( search_space_id: int, payload: CirclebackWebhookPayload, + session: AsyncSession = Depends(get_async_session), ): """ Receive and process a Circleback webhook. @@ -223,6 +228,7 @@ async def receive_circleback_webhook( Args: search_space_id: The ID of the search space to save the document to payload: The Circleback webhook payload containing meeting data + session: Database session for looking up the connector Returns: Success message with document details @@ -236,6 +242,26 @@ async def receive_circleback_webhook( f"Received Circleback webhook for meeting {payload.id} in search space {search_space_id}" ) + # Look up the Circleback connector for this search space + connector_result = await session.execute( + select(SearchSourceConnector.id).where( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.CIRCLEBACK_CONNECTOR, + ) + ) + connector_id = connector_result.scalar_one_or_none() + + if connector_id: + logger.info( + f"Found Circleback connector {connector_id} for search space {search_space_id}" + ) + else: + logger.warning( + f"No Circleback connector found for search space {search_space_id}. " + "Document will be created without connector_id." + ) + # Convert to markdown markdown_content = format_circleback_meeting_to_markdown(payload) @@ -264,6 +290,7 @@ async def receive_circleback_webhook( markdown_content=markdown_content, metadata=meeting_metadata, search_space_id=search_space_id, + connector_id=connector_id, ) logger.info( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 3a937653d..e8ee0282d 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -524,9 +524,17 @@ async def delete_search_source_connector( user: User = Depends(current_active_user), ): """ - Delete a search source connector. + Delete a search source connector and all its associated documents. + + The deletion runs in background via Celery task. User is notified + via the notification system when complete (no polling required). + Requires CONNECTORS_DELETE permission. """ + from app.tasks.celery_tasks.connector_deletion_task import ( + delete_connector_with_documents_task, + ) + try: # Get the connector first result = await session.execute( @@ -548,7 +556,12 @@ async def delete_search_source_connector( "You don't have permission to delete this connector", ) - # Delete any periodic schedule associated with this connector + # Store connector info before we queue the deletion task + connector_name = db_connector.name + connector_type = db_connector.connector_type.value + search_space_id = db_connector.search_space_id + + # Delete any periodic schedule associated with this connector (lightweight, sync) if db_connector.periodic_indexing_enabled: success = delete_periodic_schedule(connector_id) if not success: @@ -556,7 +569,7 @@ async def delete_search_source_connector( f"Failed to delete periodic schedule for connector {connector_id}" ) - # For Composio connectors, also delete the connected account in Composio + # For Composio connectors, delete the connected account in Composio (lightweight API call, sync) composio_connector_types = [ SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, @@ -588,16 +601,33 @@ async def delete_search_source_connector( f"Error deleting Composio connected account {composio_connected_account_id}: {composio_error!s}" ) - await session.delete(db_connector) - await session.commit() - return {"message": "Search source connector deleted successfully"} + # Queue background task to delete documents and connector + # This handles potentially large document counts without blocking the API + delete_connector_with_documents_task.delay( + connector_id=connector_id, + user_id=str(user.id), + search_space_id=search_space_id, + connector_name=connector_name, + connector_type=connector_type, + ) + + logger.info( + f"Queued deletion task for connector {connector_id} ({connector_name})" + ) + + return { + "message": "Connector deletion started. You will be notified when complete.", + "status": "queued", + "connector_id": connector_id, + "connector_name": connector_name, + } except HTTPException: raise except Exception as e: await session.rollback() raise HTTPException( status_code=500, - detail=f"Failed to delete search source connector: {e!s}", + detail=f"Failed to start connector deletion: {e!s}", ) from e diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py new file mode 100644 index 000000000..b794c58d0 --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py @@ -0,0 +1,272 @@ +"""Celery task for background connector deletion. + +This task handles the deletion of all documents associated with a connector +in the background, then deletes the connector itself. User is notified via +the notification system when complete (no polling required). + +Features: +- Batch deletion to handle large document counts +- Automatic retry on failure +- Progress tracking via notifications +- Handles both success and failure notifications +""" + +import asyncio +import logging +from uuid import UUID + +from sqlalchemy import delete, func, select +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.pool import NullPool + +from app.celery_app import celery_app +from app.config import config +from app.db import Document, Notification, SearchSourceConnector + +logger = logging.getLogger(__name__) + +# Batch size for document deletion +DELETION_BATCH_SIZE = 500 + + +def _get_celery_session_maker(): + """Create async session maker for Celery tasks.""" + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False), engine + + +@celery_app.task( + bind=True, + name="delete_connector_with_documents", + max_retries=3, + default_retry_delay=60, + autoretry_for=(Exception,), + retry_backoff=True, +) +def delete_connector_with_documents_task( + self, + connector_id: int, + user_id: str, + search_space_id: int, + connector_name: str, + connector_type: str, +): + """ + Background task to delete a connector and all its associated documents. + + Creates a notification when complete (success or failure). + No polling required - user sees notification in UI. + + Args: + connector_id: ID of the connector to delete + user_id: ID of the user who initiated the deletion + search_space_id: ID of the search space + connector_name: Name of the connector (for notification message) + connector_type: Type of the connector (for logging) + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + return loop.run_until_complete( + _delete_connector_async( + connector_id=connector_id, + user_id=user_id, + search_space_id=search_space_id, + connector_name=connector_name, + connector_type=connector_type, + ) + ) + finally: + loop.close() + + +async def _delete_connector_async( + connector_id: int, + user_id: str, + search_space_id: int, + connector_name: str, + connector_type: str, +) -> dict: + """ + Async implementation of connector deletion. + + Steps: + 1. Count total documents to delete + 2. Delete documents in batches (chunks cascade automatically) + 3. Delete the connector record + 4. Create success notification + + On failure, creates failure notification and re-raises exception. + """ + session_maker, engine = _get_celery_session_maker() + total_deleted = 0 + + try: + async with session_maker() as session: + # Step 1: Count total documents for this connector + count_result = await session.execute( + select(func.count(Document.id)).where( + Document.connector_id == connector_id + ) + ) + total_docs = count_result.scalar() or 0 + + logger.info( + f"Starting deletion of connector {connector_id} ({connector_name}). " + f"Documents to delete: {total_docs}" + ) + + # Step 2: Delete documents in batches + while True: + # Get batch of document IDs + result = await session.execute( + select(Document.id) + .where(Document.connector_id == connector_id) + .limit(DELETION_BATCH_SIZE) + ) + doc_ids = [row[0] for row in result.fetchall()] + + if not doc_ids: + break + + # Delete this batch (chunks are deleted via CASCADE) + await session.execute( + delete(Document).where(Document.id.in_(doc_ids)) + ) + await session.commit() + + total_deleted += len(doc_ids) + logger.info( + f"Deleted batch of {len(doc_ids)} documents. " + f"Progress: {total_deleted}/{total_docs}" + ) + + # Step 3: Delete the connector record + result = await session.execute( + select(SearchSourceConnector).where( + SearchSourceConnector.id == connector_id + ) + ) + connector = result.scalar_one_or_none() + + if connector: + await session.delete(connector) + logger.info(f"Deleted connector record: {connector_id}") + else: + logger.warning( + f"Connector {connector_id} not found - may have been already deleted" + ) + + # Step 4: Create success notification + doc_text = "document" if total_deleted == 1 else "documents" + notification = Notification( + user_id=UUID(user_id), + search_space_id=search_space_id, + type="connector_deletion", + title=f"{connector_name} Removed", + message=f"Connector and {total_deleted} {doc_text} have been removed from your knowledge base.", + notification_metadata={ + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "documents_deleted": total_deleted, + "status": "completed", + }, + ) + session.add(notification) + await session.commit() + + logger.info( + f"Connector {connector_id} ({connector_name}) deleted successfully. " + f"Total documents deleted: {total_deleted}" + ) + + return { + "status": "success", + "connector_id": connector_id, + "connector_name": connector_name, + "documents_deleted": total_deleted, + } + + except Exception as e: + logger.error( + f"Failed to delete connector {connector_id} ({connector_name}): {e!s}", + exc_info=True, + ) + + # Create failure notification + try: + async with session_maker() as session: + notification = Notification( + user_id=UUID(user_id), + search_space_id=search_space_id, + type="connector_deletion", + title=f"Failed to Remove {connector_name}", + message="Something went wrong while removing this connector. Please try again.", + notification_metadata={ + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "documents_deleted": total_deleted, + "status": "failed", + "error": str(e), + }, + ) + session.add(notification) + await session.commit() + except Exception as notify_error: + logger.error( + f"Failed to create failure notification: {notify_error!s}", + exc_info=True, + ) + + # Re-raise to trigger Celery retry + raise + + finally: + await engine.dispose() + + +async def delete_documents_by_connector_id( + session, + connector_id: int, + batch_size: int = DELETION_BATCH_SIZE, +) -> int: + """ + Delete all documents associated with a connector in batches. + + This is a utility function that can be used independently of the Celery task + for synchronous deletion scenarios (e.g., small document counts). + + Args: + session: AsyncSession instance + connector_id: ID of the connector + batch_size: Number of documents to delete per batch + + Returns: + Total number of documents deleted + """ + total_deleted = 0 + + while True: + result = await session.execute( + select(Document.id) + .where(Document.connector_id == connector_id) + .limit(batch_size) + ) + doc_ids = [row[0] for row in result.fetchall()] + + if not doc_ids: + break + + await session.execute(delete(Document).where(Document.id.in_(doc_ids))) + await session.commit() + total_deleted += len(doc_ids) + + return total_deleted + diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index f21ff5a30..7a3933091 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -505,6 +505,7 @@ def process_circleback_meeting_task( markdown_content: str, metadata: dict, search_space_id: int, + connector_id: int | None = None, ): """ Celery task to process Circleback meeting webhook data. @@ -515,6 +516,7 @@ def process_circleback_meeting_task( markdown_content: Meeting content formatted as markdown metadata: Meeting metadata dictionary search_space_id: ID of the search space + connector_id: ID of the Circleback connector (for deletion support) """ import asyncio @@ -529,6 +531,7 @@ def process_circleback_meeting_task( markdown_content, metadata, search_space_id, + connector_id, ) ) finally: @@ -541,6 +544,7 @@ async def _process_circleback_meeting( markdown_content: str, metadata: dict, search_space_id: int, + connector_id: int | None = None, ): """Process Circleback meeting with new session.""" from app.tasks.document_processors.circleback_processor import ( @@ -597,6 +601,7 @@ async def _process_circleback_meeting( markdown_content=markdown_content, metadata=metadata, search_space_id=search_space_id, + connector_id=connector_id, ) if result: diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 7d0837ac1..029c4a87c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -418,6 +418,7 @@ async def index_airtable_records( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index fd89792e9..fe608a8c9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -397,6 +397,7 @@ async def index_bookstack_pages( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index bcdb9c72a..a8991647c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -396,6 +396,7 @@ async def index_clickup_tasks( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 3f8f43669..24859e685 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -403,6 +403,7 @@ async def index_confluence_pages( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 3d226ed06..4999ba6d4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -528,6 +528,7 @@ async def index_discord_messages( unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 6f2dd797f..fb6487474 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -293,6 +293,7 @@ async def index_elasticsearch_documents( search_space_id=search_space_id, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) # Create chunks and attach to document (persist via relationship) diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 947035048..d82f18944 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -220,6 +220,7 @@ async def index_github_repos( user_id=user_id, task_logger=task_logger, log_entry=log_entry, + connector_id=connector_id, ) documents_processed += docs_created @@ -292,6 +293,7 @@ async def _process_repository_digest( user_id: str, task_logger: TaskLoggingService, log_entry, + connector_id: int, ) -> int: """ Process a repository digest and create documents. @@ -427,6 +429,7 @@ async def _process_repository_digest( chunks=chunks_data, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 28037ba7e..386c9de43 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -500,6 +500,7 @@ async def index_google_calendar_events( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 3cd59674e..151c1abbc 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -767,6 +767,7 @@ async def _process_single_file( session=session, task_logger=task_logger, log_entry=log_entry, + connector_id=connector_id, ) if error: diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 7c6b9ffec..34d06d796 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -413,7 +413,6 @@ async def index_google_gmail_messages( "subject": subject, "sender": sender, "date": date_str, - "connector_id": connector_id, }, content=summary_content, content_hash=content_hash, @@ -422,6 +421,7 @@ async def index_google_gmail_messages( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 6262e8535..6971703c1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -381,6 +381,7 @@ async def index_jira_issues( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index dd0483eda..a94420bc2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -414,6 +414,7 @@ async def index_linear_issues( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 74e809384..c0eb58d1d 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -477,6 +477,7 @@ async def index_luma_events( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 169dbd775..b1adeb035 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -398,6 +398,7 @@ async def index_notion_pages( } existing_document.chunks = chunks existing_document.updated_at = get_current_timestamp() + existing_document.connector_id = connector_id documents_indexed += 1 logger.info(f"Successfully updated Notion page: {page_title}") @@ -471,6 +472,7 @@ async def index_notion_pages( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index a2ccd64d9..cfc321df1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -501,6 +501,7 @@ async def index_obsidian_vault( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(new_document) diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index d922178ce..3cb4e3c85 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -390,6 +390,7 @@ async def index_slack_messages( unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 7b401f6cf..1e26fbc42 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -431,6 +431,7 @@ async def index_teams_messages( unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index 63105d7a5..cb11a6ec2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -372,6 +372,7 @@ async def index_crawled_urls( chunks=chunks, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py index ce596d579..f412b51dd 100644 --- a/surfsense_backend/app/tasks/document_processors/circleback_processor.py +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -42,6 +42,7 @@ async def add_circleback_meeting_document( markdown_content: str, metadata: dict[str, Any], search_space_id: int, + connector_id: int | None = None, ) -> Document | None: """ Process and store a Circleback meeting document. @@ -53,6 +54,7 @@ async def add_circleback_meeting_document( markdown_content: Meeting content formatted as markdown metadata: Meeting metadata dictionary search_space_id: ID of the search space + connector_id: ID of the Circleback connector (for deletion support) Returns: Document object if successful, None if failed or duplicate @@ -169,6 +171,9 @@ async def add_circleback_meeting_document( existing_document.blocknote_document = blocknote_json existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() + # Ensure connector_id is set (backfill for documents created before this field) + if connector_id is not None: + existing_document.connector_id = connector_id await session.commit() await session.refresh(existing_document) @@ -192,6 +197,7 @@ async def add_circleback_meeting_document( content_needs_reindexing=False, updated_at=get_current_timestamp(), created_by_id=created_by_user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 2f2e5a2e8..674773463 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -527,6 +527,7 @@ async def add_received_file_document_using_unstructured( content_needs_reindexing=False, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -667,6 +668,7 @@ async def add_received_file_document_using_llamacloud( content_needs_reindexing=False, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -832,6 +834,7 @@ async def add_received_file_document_using_docling( content_needs_reindexing=False, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -852,7 +855,7 @@ async def add_received_file_document_using_docling( async def _update_document_from_connector( document: Document | None, connector: dict | None, session: AsyncSession ) -> None: - """Helper to update document type and metadata from connector info.""" + """Helper to update document type, metadata, and connector_id from connector info.""" if document and connector: if "type" in connector: document.document_type = connector["type"] @@ -864,6 +867,9 @@ async def _update_document_from_connector( # Expand existing metadata with connector metadata merged = {**document.document_metadata, **connector["metadata"]} document.document_metadata = merged + # Set connector_id if provided for de-indexing support + if "connector_id" in connector: + document.connector_id = connector["connector_id"] await session.commit() diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index a2399206a..ff85d962e 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -296,6 +296,7 @@ async def add_received_markdown_file_document( blocknote_document=blocknote_json, updated_at=get_current_timestamp(), created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) diff --git a/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx b/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx index 9f73d1a82..9ef49c0d8 100644 --- a/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx +++ b/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx @@ -237,7 +237,7 @@ export function InboxSidebar({ const currentDataSource = activeTab === "mentions" ? mentions : status; const { loading, loadingMore = false, hasMore = false, loadMore } = currentDataSource; - // Status tab includes: connector indexing, document processing, page limit exceeded + // Status tab includes: connector indexing, document processing, page limit exceeded, connector deletion // Filter to only show status notification types const statusItems = useMemo( () => @@ -245,7 +245,8 @@ export function InboxSidebar({ (item) => item.type === "connector_indexing" || item.type === "document_processing" || - item.type === "page_limit_exceeded" + item.type === "page_limit_exceeded" || + item.type === "connector_deletion" ), [status.items] );