diff --git a/surfsense_backend/alembic/versions/86_add_document_created_by.py b/surfsense_backend/alembic/versions/86_add_document_created_by.py new file mode 100644 index 000000000..ea2e709c8 --- /dev/null +++ b/surfsense_backend/alembic/versions/86_add_document_created_by.py @@ -0,0 +1,125 @@ +"""Add created_by_id column to documents table for document ownership tracking + +Revision ID: 86 +Revises: 85 +Create Date: 2026-02-02 + +Changes: +1. Add created_by_id column (UUID, nullable, foreign key to user.id) +2. Create index on created_by_id for performance +3. Backfill existing documents with search space owner's user_id +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "86" +down_revision: str | None = "85" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add created_by_id column to documents and backfill with search space owner.""" + + # 1. Add created_by_id column (nullable for backward compatibility) + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'created_by_id' + ) THEN + ALTER TABLE documents + ADD COLUMN created_by_id UUID; + END IF; + END$$; + """ + ) + + # 2. Create index on created_by_id for efficient queries + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_documents_created_by_id + ON documents (created_by_id); + """ + ) + + # 3. Add foreign key constraint with ON DELETE SET NULL + # First check if constraint already exists + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_created_by_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + ADD CONSTRAINT fk_documents_created_by_id + FOREIGN KEY (created_by_id) REFERENCES "user"(id) + ON DELETE SET NULL; + END IF; + END$$; + """ + ) + + # 4. Backfill existing documents with search space owner's user_id + # This ensures all existing documents are associated with the search space owner + op.execute( + """ + UPDATE documents + SET created_by_id = searchspaces.user_id + FROM searchspaces + WHERE documents.search_space_id = searchspaces.id + AND documents.created_by_id IS NULL; + """ + ) + + +def downgrade() -> None: + """Remove created_by_id column from documents.""" + + # Drop foreign key constraint + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_created_by_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + DROP CONSTRAINT fk_documents_created_by_id; + END IF; + END$$; + """ + ) + + # Drop index + op.execute( + """ + DROP INDEX IF EXISTS ix_documents_created_by_id; + """ + ) + + # Drop column + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'created_by_id' + ) THEN + ALTER TABLE documents + DROP COLUMN created_by_id; + END IF; + END$$; + """ + ) diff --git a/surfsense_backend/alembic/versions/87_add_document_connector_id.py b/surfsense_backend/alembic/versions/87_add_document_connector_id.py new file mode 100644 index 000000000..75abe8ab9 --- /dev/null +++ b/surfsense_backend/alembic/versions/87_add_document_connector_id.py @@ -0,0 +1,170 @@ +"""Add connector_id column to documents table for linking documents to their source connector + +Revision ID: 87 +Revises: 86 +Create Date: 2026-02-02 + +Changes: +1. Add connector_id column (Integer, nullable, foreign key to search_source_connectors.id) +2. Create index on connector_id for efficient bulk deletion queries +3. SET NULL on delete - allows controlled cleanup in application code +4. Backfill existing documents based on document_type and search_space_id matching +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "87" +down_revision: str | None = "86" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add connector_id column to documents and backfill from existing connectors.""" + + # 1. Add connector_id column (nullable - for manually uploaded docs without connector) + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'connector_id' + ) THEN + ALTER TABLE documents + ADD COLUMN connector_id INTEGER; + END IF; + END$$; + """ + ) + + # 2. Create index on connector_id for efficient cleanup queries + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_documents_connector_id + ON documents (connector_id); + """ + ) + + # 3. Add foreign key constraint with ON DELETE SET NULL + # SET NULL allows us to delete documents in controlled batches before deleting connector + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_connector_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + ADD CONSTRAINT fk_documents_connector_id + FOREIGN KEY (connector_id) REFERENCES search_source_connectors(id) + ON DELETE SET NULL; + END IF; + END$$; + """ + ) + + # 4. Backfill existing documents with connector_id based on document_type matching + # This maps document types to their corresponding connector types + # Only backfills for documents in search spaces that have exactly one connector of that type + + # Map of document_type -> connector_type for backfilling + document_connector_mappings = [ + ("NOTION_CONNECTOR", "NOTION_CONNECTOR"), + ("SLACK_CONNECTOR", "SLACK_CONNECTOR"), + ("TEAMS_CONNECTOR", "TEAMS_CONNECTOR"), + ("GITHUB_CONNECTOR", "GITHUB_CONNECTOR"), + ("LINEAR_CONNECTOR", "LINEAR_CONNECTOR"), + ("DISCORD_CONNECTOR", "DISCORD_CONNECTOR"), + ("JIRA_CONNECTOR", "JIRA_CONNECTOR"), + ("CONFLUENCE_CONNECTOR", "CONFLUENCE_CONNECTOR"), + ("CLICKUP_CONNECTOR", "CLICKUP_CONNECTOR"), + ("GOOGLE_CALENDAR_CONNECTOR", "GOOGLE_CALENDAR_CONNECTOR"), + ("GOOGLE_GMAIL_CONNECTOR", "GOOGLE_GMAIL_CONNECTOR"), + ("GOOGLE_DRIVE_FILE", "GOOGLE_DRIVE_CONNECTOR"), + ("AIRTABLE_CONNECTOR", "AIRTABLE_CONNECTOR"), + ("LUMA_CONNECTOR", "LUMA_CONNECTOR"), + ("ELASTICSEARCH_CONNECTOR", "ELASTICSEARCH_CONNECTOR"), + ("BOOKSTACK_CONNECTOR", "BOOKSTACK_CONNECTOR"), + ("CIRCLEBACK", "CIRCLEBACK_CONNECTOR"), + ("OBSIDIAN_CONNECTOR", "OBSIDIAN_CONNECTOR"), + ("COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"), + ("COMPOSIO_GMAIL_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR"), + ("COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"), + ("CRAWLED_URL", "WEBCRAWLER_CONNECTOR"), + ] + + for doc_type, connector_type in document_connector_mappings: + # Backfill connector_id for documents where: + # 1. Document has this document_type + # 2. Document doesn't already have a connector_id + # 3. There's exactly one connector of this type in the same search space + # This safely handles most cases while avoiding ambiguity + op.execute( + f""" + UPDATE documents d + SET connector_id = ( + SELECT ssc.id + FROM search_source_connectors ssc + WHERE ssc.search_space_id = d.search_space_id + AND ssc.connector_type = '{connector_type}' + LIMIT 1 + ) + WHERE d.document_type = '{doc_type}' + AND d.connector_id IS NULL + AND EXISTS ( + SELECT 1 FROM search_source_connectors ssc + WHERE ssc.search_space_id = d.search_space_id + AND ssc.connector_type = '{connector_type}' + ); + """ + ) + + +def downgrade() -> None: + """Remove connector_id column from documents.""" + + # Drop foreign key constraint + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'fk_documents_connector_id' + AND table_name = 'documents' + ) THEN + ALTER TABLE documents + DROP CONSTRAINT fk_documents_connector_id; + END IF; + END$$; + """ + ) + + # Drop index + op.execute( + """ + DROP INDEX IF EXISTS ix_documents_connector_id; + """ + ) + + # Drop column + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'connector_id' + ) THEN + ALTER TABLE documents + DROP COLUMN connector_id; + END IF; + END$$; + """ + ) diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index b690f6096..74b21fbf0 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -81,6 +81,7 @@ celery_app = Celery( "app.tasks.celery_tasks.blocknote_migration_tasks", "app.tasks.celery_tasks.document_reindex_tasks", "app.tasks.celery_tasks.stale_notification_cleanup_task", + "app.tasks.celery_tasks.connector_deletion_task", ], ) diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py index d3a0d344b..1964a4d45 100644 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -394,6 +394,8 @@ async def _process_gmail_message_batch( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py index 4302e479b..78ff360ca 100644 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -442,6 +442,8 @@ async def index_composio_google_calendar( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 364712215..369f5d8b3 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -1248,7 +1248,6 @@ async def _process_single_drive_file( "file_name": file_name, "FILE_NAME": file_name, # For compatibility "mime_type": mime_type, - "connector_id": connector_id, "toolkit_id": "googledrive", "source": "composio", }, @@ -1258,6 +1257,8 @@ async def _process_single_drive_file( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 61b427970..39a92f95f 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -25,6 +25,7 @@ async def download_and_process_file( session: AsyncSession, task_logger: TaskLoggingService, log_entry: Log, + connector_id: int | None = None, ) -> tuple[Any, str | None, dict[str, Any] | None]: """ Download Google Drive file and process using Surfsense file processors. @@ -37,6 +38,7 @@ async def download_and_process_file( session: Database session task_logger: Task logging service log_entry: Log entry for tracking + connector_id: ID of the connector (for de-indexing support) Returns: Tuple of (Document object if successful, error message if failed, file metadata dict) @@ -92,6 +94,9 @@ async def download_and_process_file( "source_connector": "google_drive", }, } + # Include connector_id for de-indexing support + if connector_id is not None: + connector_info["connector_id"] = connector_id # Add additional Drive metadata if available if "modifiedTime" in file: diff --git a/surfsense_backend/app/connectors/google_drive/credentials.py b/surfsense_backend/app/connectors/google_drive/credentials.py index 5b1900ab2..951d8e3e3 100644 --- a/surfsense_backend/app/connectors/google_drive/credentials.py +++ b/surfsense_backend/app/connectors/google_drive/credentials.py @@ -127,7 +127,12 @@ async def get_valid_credentials( ) creds_dict["_token_encrypted"] = True - connector.config = creds_dict + # IMPORTANT: Merge new credentials with existing config to preserve + # user settings like selected_folders, selected_files, indexing_options, + # folder_tokens, etc. that would otherwise be wiped on token refresh. + existing_config = connector.config.copy() if connector.config else {} + existing_config.update(creds_dict) + connector.config = existing_config flag_modified(connector, "config") await session.commit() diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 56e39c2e7..4148b8e38 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -751,7 +751,27 @@ class Document(BaseModel, TimestampMixin): search_space_id = Column( Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False ) + + # Track who created/uploaded this document + created_by_id = Column( + UUID(as_uuid=True), + ForeignKey("user.id", ondelete="SET NULL"), + nullable=True, # Nullable for backward compatibility with existing records + index=True, + ) + + # Track which connector created this document (for cleanup on connector deletion) + connector_id = Column( + Integer, + ForeignKey("search_source_connectors.id", ondelete="SET NULL"), + nullable=True, # Nullable for manually uploaded docs without connector + index=True, + ) + + # Relationships search_space = relationship("SearchSpace", back_populates="documents") + created_by = relationship("User", back_populates="documents") + connector = relationship("SearchSourceConnector", back_populates="documents") chunks = relationship( "Chunk", back_populates="document", cascade="all, delete-orphan" ) @@ -980,6 +1000,9 @@ class SearchSourceConnector(BaseModel, TimestampMixin): UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False ) + # Documents created by this connector (for cleanup on connector deletion) + documents = relationship("Document", back_populates="connector") + class NewLLMConfig(BaseModel, TimestampMixin): """ @@ -1286,6 +1309,13 @@ if config.AUTH_TYPE == "GOOGLE": passive_deletes=True, ) + # Documents created/uploaded by this user + documents = relationship( + "Document", + back_populates="created_by", + passive_deletes=True, + ) + # User memories for personalized AI responses memories = relationship( "UserMemory", @@ -1344,6 +1374,13 @@ else: passive_deletes=True, ) + # Documents created/uploaded by this user + documents = relationship( + "Document", + back_populates="created_by", + passive_deletes=True, + ) + # User memories for personalized AI responses memories = relationship( "UserMemory", diff --git a/surfsense_backend/app/routes/circleback_webhook_route.py b/surfsense_backend/app/routes/circleback_webhook_route.py index 1285aadeb..4a5823645 100644 --- a/surfsense_backend/app/routes/circleback_webhook_route.py +++ b/surfsense_backend/app/routes/circleback_webhook_route.py @@ -9,8 +9,12 @@ import logging from datetime import datetime from typing import Any -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import SearchSourceConnector, SearchSourceConnectorType, get_async_session logger = logging.getLogger(__name__) @@ -212,6 +216,7 @@ def format_circleback_meeting_to_markdown(payload: CirclebackWebhookPayload) -> async def receive_circleback_webhook( search_space_id: int, payload: CirclebackWebhookPayload, + session: AsyncSession = Depends(get_async_session), ): """ Receive and process a Circleback webhook. @@ -223,6 +228,7 @@ async def receive_circleback_webhook( Args: search_space_id: The ID of the search space to save the document to payload: The Circleback webhook payload containing meeting data + session: Database session for looking up the connector Returns: Success message with document details @@ -236,6 +242,26 @@ async def receive_circleback_webhook( f"Received Circleback webhook for meeting {payload.id} in search space {search_space_id}" ) + # Look up the Circleback connector for this search space + connector_result = await session.execute( + select(SearchSourceConnector.id).where( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.CIRCLEBACK_CONNECTOR, + ) + ) + connector_id = connector_result.scalar_one_or_none() + + if connector_id: + logger.info( + f"Found Circleback connector {connector_id} for search space {search_space_id}" + ) + else: + logger.warning( + f"No Circleback connector found for search space {search_space_id}. " + "Document will be created without connector_id." + ) + # Convert to markdown markdown_content = format_circleback_meeting_to_markdown(payload) @@ -264,6 +290,7 @@ async def receive_circleback_webhook( markdown_content=markdown_content, metadata=meeting_metadata, search_space_id=search_space_id, + connector_id=connector_id, ) logger.info( diff --git a/surfsense_backend/app/routes/composio_routes.py b/surfsense_backend/app/routes/composio_routes.py index a28361132..602aa876c 100644 --- a/surfsense_backend/app/routes/composio_routes.py +++ b/surfsense_backend/app/routes/composio_routes.py @@ -20,6 +20,7 @@ from pydantic import ValidationError from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select +from sqlalchemy.orm.attributes import flag_modified from app.config import config from app.db import ( @@ -330,10 +331,19 @@ async def composio_callback( ) # Update existing connector with new connected_account_id + # IMPORTANT: Merge new credentials with existing config to preserve + # user settings like selected_folders, selected_files, indexing_options, + # drive_page_token, etc. that would otherwise be wiped on reconnection. logger.info( f"Updating existing Composio connector {existing_connector.id} with new connected_account_id {final_connected_account_id}" ) - existing_connector.config = connector_config + existing_config = ( + existing_connector.config.copy() if existing_connector.config else {} + ) + existing_config.update(connector_config) + existing_connector.config = existing_config + + flag_modified(existing_connector, "config") await session.commit() await session.refresh(existing_connector) diff --git a/surfsense_backend/app/routes/notes_routes.py b/surfsense_backend/app/routes/notes_routes.py index 5bb0a88a9..928cd462a 100644 --- a/surfsense_backend/app/routes/notes_routes.py +++ b/surfsense_backend/app/routes/notes_routes.py @@ -76,6 +76,7 @@ async def create_note( document_metadata={"NOTE": True}, embedding=None, # Will be generated on first reindex updated_at=datetime.now(UTC), + created_by_id=user.id, # Track who created this note ) session.add(document) @@ -93,6 +94,7 @@ async def create_note( search_space_id=document.search_space_id, created_at=document.created_at, updated_at=document.updated_at, + created_by_id=document.created_by_id, ) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 3c7d66c3a..70e8f28f9 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -527,9 +527,17 @@ async def delete_search_source_connector( user: User = Depends(current_active_user), ): """ - Delete a search source connector. + Delete a search source connector and all its associated documents. + + The deletion runs in background via Celery task. User is notified + via the notification system when complete (no polling required). + Requires CONNECTORS_DELETE permission. """ + from app.tasks.celery_tasks.connector_deletion_task import ( + delete_connector_with_documents_task, + ) + try: # Get the connector first result = await session.execute( @@ -551,7 +559,12 @@ async def delete_search_source_connector( "You don't have permission to delete this connector", ) - # Delete any periodic schedule associated with this connector + # Store connector info before we queue the deletion task + connector_name = db_connector.name + connector_type = db_connector.connector_type.value + search_space_id = db_connector.search_space_id + + # Delete any periodic schedule associated with this connector (lightweight, sync) if db_connector.periodic_indexing_enabled: success = delete_periodic_schedule(connector_id) if not success: @@ -559,7 +572,7 @@ async def delete_search_source_connector( f"Failed to delete periodic schedule for connector {connector_id}" ) - # For Composio connectors, also delete the connected account in Composio + # For Composio connectors, delete the connected account in Composio (lightweight API call, sync) composio_connector_types = [ SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, @@ -591,16 +604,33 @@ async def delete_search_source_connector( f"Error deleting Composio connected account {composio_connected_account_id}: {composio_error!s}" ) - await session.delete(db_connector) - await session.commit() - return {"message": "Search source connector deleted successfully"} + # Queue background task to delete documents and connector + # This handles potentially large document counts without blocking the API + delete_connector_with_documents_task.delay( + connector_id=connector_id, + user_id=str(user.id), + search_space_id=search_space_id, + connector_name=connector_name, + connector_type=connector_type, + ) + + logger.info( + f"Queued deletion task for connector {connector_id} ({connector_name})" + ) + + return { + "message": "Connector deletion started. You will be notified when complete.", + "status": "queued", + "connector_id": connector_id, + "connector_name": connector_name, + } except HTTPException: raise except Exception as e: await session.rollback() raise HTTPException( status_code=500, - detail=f"Failed to delete search source connector: {e!s}", + detail=f"Failed to start connector deletion: {e!s}", ) from e diff --git a/surfsense_backend/app/schemas/documents.py b/surfsense_backend/app/schemas/documents.py index 2b4bda0ca..1f82ae9ce 100644 --- a/surfsense_backend/app/schemas/documents.py +++ b/surfsense_backend/app/schemas/documents.py @@ -1,5 +1,6 @@ from datetime import datetime from typing import TypeVar +from uuid import UUID from pydantic import BaseModel, ConfigDict @@ -51,6 +52,7 @@ class DocumentRead(BaseModel): created_at: datetime updated_at: datetime | None search_space_id: int + created_by_id: UUID | None = None # User who created/uploaded this document model_config = ConfigDict(from_attributes=True) diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py new file mode 100644 index 000000000..0fd68637c --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py @@ -0,0 +1,269 @@ +"""Celery task for background connector deletion. + +This task handles the deletion of all documents associated with a connector +in the background, then deletes the connector itself. User is notified via +the notification system when complete (no polling required). + +Features: +- Batch deletion to handle large document counts +- Automatic retry on failure +- Progress tracking via notifications +- Handles both success and failure notifications +""" + +import asyncio +import logging +from uuid import UUID + +from sqlalchemy import delete, func, select +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.pool import NullPool + +from app.celery_app import celery_app +from app.config import config +from app.db import Document, Notification, SearchSourceConnector + +logger = logging.getLogger(__name__) + +# Batch size for document deletion +DELETION_BATCH_SIZE = 500 + + +def _get_celery_session_maker(): + """Create async session maker for Celery tasks.""" + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False), engine + + +@celery_app.task( + bind=True, + name="delete_connector_with_documents", + max_retries=3, + default_retry_delay=60, + autoretry_for=(Exception,), + retry_backoff=True, +) +def delete_connector_with_documents_task( + self, + connector_id: int, + user_id: str, + search_space_id: int, + connector_name: str, + connector_type: str, +): + """ + Background task to delete a connector and all its associated documents. + + Creates a notification when complete (success or failure). + No polling required - user sees notification in UI. + + Args: + connector_id: ID of the connector to delete + user_id: ID of the user who initiated the deletion + search_space_id: ID of the search space + connector_name: Name of the connector (for notification message) + connector_type: Type of the connector (for logging) + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + return loop.run_until_complete( + _delete_connector_async( + connector_id=connector_id, + user_id=user_id, + search_space_id=search_space_id, + connector_name=connector_name, + connector_type=connector_type, + ) + ) + finally: + loop.close() + + +async def _delete_connector_async( + connector_id: int, + user_id: str, + search_space_id: int, + connector_name: str, + connector_type: str, +) -> dict: + """ + Async implementation of connector deletion. + + Steps: + 1. Count total documents to delete + 2. Delete documents in batches (chunks cascade automatically) + 3. Delete the connector record + 4. Create success notification + + On failure, creates failure notification and re-raises exception. + """ + session_maker, engine = _get_celery_session_maker() + total_deleted = 0 + + try: + async with session_maker() as session: + # Step 1: Count total documents for this connector + count_result = await session.execute( + select(func.count(Document.id)).where( + Document.connector_id == connector_id + ) + ) + total_docs = count_result.scalar() or 0 + + logger.info( + f"Starting deletion of connector {connector_id} ({connector_name}). " + f"Documents to delete: {total_docs}" + ) + + # Step 2: Delete documents in batches + while True: + # Get batch of document IDs + result = await session.execute( + select(Document.id) + .where(Document.connector_id == connector_id) + .limit(DELETION_BATCH_SIZE) + ) + doc_ids = [row[0] for row in result.fetchall()] + + if not doc_ids: + break + + # Delete this batch (chunks are deleted via CASCADE) + await session.execute(delete(Document).where(Document.id.in_(doc_ids))) + await session.commit() + + total_deleted += len(doc_ids) + logger.info( + f"Deleted batch of {len(doc_ids)} documents. " + f"Progress: {total_deleted}/{total_docs}" + ) + + # Step 3: Delete the connector record + result = await session.execute( + select(SearchSourceConnector).where( + SearchSourceConnector.id == connector_id + ) + ) + connector = result.scalar_one_or_none() + + if connector: + await session.delete(connector) + logger.info(f"Deleted connector record: {connector_id}") + else: + logger.warning( + f"Connector {connector_id} not found - may have been already deleted" + ) + + # Step 4: Create success notification + doc_text = "document" if total_deleted == 1 else "documents" + notification = Notification( + user_id=UUID(user_id), + search_space_id=search_space_id, + type="connector_deletion", + title=f"{connector_name} Removed", + message=f"Connector and {total_deleted} {doc_text} have been removed from your knowledge base.", + notification_metadata={ + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "documents_deleted": total_deleted, + "status": "completed", + }, + ) + session.add(notification) + await session.commit() + + logger.info( + f"Connector {connector_id} ({connector_name}) deleted successfully. " + f"Total documents deleted: {total_deleted}" + ) + + return { + "status": "success", + "connector_id": connector_id, + "connector_name": connector_name, + "documents_deleted": total_deleted, + } + + except Exception as e: + logger.error( + f"Failed to delete connector {connector_id} ({connector_name}): {e!s}", + exc_info=True, + ) + + # Create failure notification + try: + async with session_maker() as session: + notification = Notification( + user_id=UUID(user_id), + search_space_id=search_space_id, + type="connector_deletion", + title=f"Failed to Remove {connector_name}", + message="Something went wrong while removing this connector. Please try again.", + notification_metadata={ + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "documents_deleted": total_deleted, + "status": "failed", + "error": str(e), + }, + ) + session.add(notification) + await session.commit() + except Exception as notify_error: + logger.error( + f"Failed to create failure notification: {notify_error!s}", + exc_info=True, + ) + + # Re-raise to trigger Celery retry + raise + + finally: + await engine.dispose() + + +async def delete_documents_by_connector_id( + session, + connector_id: int, + batch_size: int = DELETION_BATCH_SIZE, +) -> int: + """ + Delete all documents associated with a connector in batches. + + This is a utility function that can be used independently of the Celery task + for synchronous deletion scenarios (e.g., small document counts). + + Args: + session: AsyncSession instance + connector_id: ID of the connector + batch_size: Number of documents to delete per batch + + Returns: + Total number of documents deleted + """ + total_deleted = 0 + + while True: + result = await session.execute( + select(Document.id) + .where(Document.connector_id == connector_id) + .limit(batch_size) + ) + doc_ids = [row[0] for row in result.fetchall()] + + if not doc_ids: + break + + await session.execute(delete(Document).where(Document.id.in_(doc_ids))) + await session.commit() + total_deleted += len(doc_ids) + + return total_deleted diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 6279510da..f310bb03e 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -545,6 +545,7 @@ def process_circleback_meeting_task( markdown_content: str, metadata: dict, search_space_id: int, + connector_id: int | None = None, ): """ Celery task to process Circleback meeting webhook data. @@ -555,6 +556,7 @@ def process_circleback_meeting_task( markdown_content: Meeting content formatted as markdown metadata: Meeting metadata dictionary search_space_id: ID of the search space + connector_id: ID of the Circleback connector (for deletion support) """ import asyncio @@ -569,6 +571,7 @@ def process_circleback_meeting_task( markdown_content, metadata, search_space_id, + connector_id, ) ) finally: @@ -581,6 +584,7 @@ async def _process_circleback_meeting( markdown_content: str, metadata: dict, search_space_id: int, + connector_id: int | None = None, ): """Process Circleback meeting with new session.""" from app.tasks.document_processors.circleback_processor import ( @@ -637,6 +641,7 @@ async def _process_circleback_meeting( markdown_content=markdown_content, metadata=metadata, search_space_id=search_space_id, + connector_id=connector_id, ) if result: diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 3bcf95d6a..029c4a87c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -417,6 +417,8 @@ async def index_airtable_records( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index d726e5d95..fe608a8c9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -396,6 +396,8 @@ async def index_bookstack_pages( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index e7e8b23e5..a8991647c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -395,6 +395,8 @@ async def index_clickup_tasks( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 2f20472d2..24859e685 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -402,6 +402,8 @@ async def index_confluence_pages( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index a70bc42d4..4999ba6d4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -527,6 +527,8 @@ async def index_discord_messages( content_hash=content_hash, unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 8fbba6463..fb6487474 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -292,6 +292,8 @@ async def index_elasticsearch_documents( document_metadata=metadata, search_space_id=search_space_id, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) # Create chunks and attach to document (persist via relationship) diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index b01d235cf..d82f18944 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -220,6 +220,7 @@ async def index_github_repos( user_id=user_id, task_logger=task_logger, log_entry=log_entry, + connector_id=connector_id, ) documents_processed += docs_created @@ -292,6 +293,7 @@ async def _process_repository_digest( user_id: str, task_logger: TaskLoggingService, log_entry, + connector_id: int, ) -> int: """ Process a repository digest and create documents. @@ -426,6 +428,8 @@ async def _process_repository_digest( search_space_id=search_space_id, chunks=chunks_data, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index f64a7a5c3..386c9de43 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -499,6 +499,8 @@ async def index_google_calendar_events( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 3cd59674e..151c1abbc 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -767,6 +767,7 @@ async def _process_single_file( session=session, task_logger=task_logger, log_entry=log_entry, + connector_id=connector_id, ) if error: diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 45ce91c6f..34d06d796 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -413,7 +413,6 @@ async def index_google_gmail_messages( "subject": subject, "sender": sender, "date": date_str, - "connector_id": connector_id, }, content=summary_content, content_hash=content_hash, @@ -421,6 +420,8 @@ async def index_google_gmail_messages( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) documents_indexed += 1 diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index acee74192..6971703c1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -380,6 +380,8 @@ async def index_jira_issues( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index fc4ae5f18..a94420bc2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -413,6 +413,8 @@ async def index_linear_issues( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index a18abf8ae..c0eb58d1d 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -476,6 +476,8 @@ async def index_luma_events( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 52622471a..b1adeb035 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -398,6 +398,7 @@ async def index_notion_pages( } existing_document.chunks = chunks existing_document.updated_at = get_current_timestamp() + existing_document.connector_id = connector_id documents_indexed += 1 logger.info(f"Successfully updated Notion page: {page_title}") @@ -470,6 +471,8 @@ async def index_notion_pages( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index a8cd78cc9..cfc321df1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -500,6 +500,8 @@ async def index_obsidian_vault( embedding=embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(new_document) diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index 5923c8089..3cb4e3c85 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -389,6 +389,8 @@ async def index_slack_messages( content_hash=content_hash, unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 162509a1e..1e26fbc42 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -430,6 +430,8 @@ async def index_teams_messages( content_hash=content_hash, unique_identifier_hash=unique_identifier_hash, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index ac16ecde6..cb11a6ec2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -371,6 +371,8 @@ async def index_crawled_urls( embedding=summary_embedding, chunks=chunks, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py index 0a1d91784..f412b51dd 100644 --- a/surfsense_backend/app/tasks/document_processors/circleback_processor.py +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -8,10 +8,17 @@ and stores it as searchable documents in the database. import logging from typing import Any +from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.db import Document, DocumentType +from app.db import ( + Document, + DocumentType, + SearchSourceConnector, + SearchSourceConnectorType, + SearchSpace, +) from app.services.llm_service import get_document_summary_llm from app.utils.document_converters import ( create_document_chunks, @@ -35,6 +42,7 @@ async def add_circleback_meeting_document( markdown_content: str, metadata: dict[str, Any], search_space_id: int, + connector_id: int | None = None, ) -> Document | None: """ Process and store a Circleback meeting document. @@ -46,6 +54,7 @@ async def add_circleback_meeting_document( markdown_content: Meeting content formatted as markdown metadata: Meeting metadata dictionary search_space_id: ID of the search space + connector_id: ID of the Circleback connector (for deletion support) Returns: Document object if successful, None if failed or duplicate @@ -125,6 +134,30 @@ async def add_circleback_meeting_document( **metadata, } + # Fetch the user who set up the Circleback connector (preferred) + # or fall back to search space owner if no connector found + created_by_user_id = None + + # Try to find the Circleback connector for this search space + connector_result = await session.execute( + select(SearchSourceConnector.user_id).where( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.CIRCLEBACK_CONNECTOR, + ) + ) + connector_user = connector_result.scalar_one_or_none() + + if connector_user: + # Use the user who set up the Circleback connector + created_by_user_id = connector_user + else: + # Fallback: use search space owner if no connector found + search_space_result = await session.execute( + select(SearchSpace.user_id).where(SearchSpace.id == search_space_id) + ) + created_by_user_id = search_space_result.scalar_one_or_none() + # Update or create document if existing_document: # Update existing document @@ -138,6 +171,9 @@ async def add_circleback_meeting_document( existing_document.blocknote_document = blocknote_json existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() + # Ensure connector_id is set (backfill for documents created before this field) + if connector_id is not None: + existing_document.connector_id = connector_id await session.commit() await session.refresh(existing_document) @@ -160,6 +196,8 @@ async def add_circleback_meeting_document( blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), + created_by_id=created_by_user_id, + connector_id=connector_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/extension_processor.py b/surfsense_backend/app/tasks/document_processors/extension_processor.py index 7d8462872..9ddab4ec6 100644 --- a/surfsense_backend/app/tasks/document_processors/extension_processor.py +++ b/surfsense_backend/app/tasks/document_processors/extension_processor.py @@ -185,6 +185,7 @@ async def add_extension_received_document( unique_identifier_hash=unique_identifier_hash, blocknote_document=blocknote_json, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 6c4be0cb8..674773463 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -526,6 +526,8 @@ async def add_received_file_document_using_unstructured( blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -665,6 +667,8 @@ async def add_received_file_document_using_llamacloud( blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -829,6 +833,8 @@ async def add_received_file_document_using_docling( blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) @@ -849,7 +855,7 @@ async def add_received_file_document_using_docling( async def _update_document_from_connector( document: Document | None, connector: dict | None, session: AsyncSession ) -> None: - """Helper to update document type and metadata from connector info.""" + """Helper to update document type, metadata, and connector_id from connector info.""" if document and connector: if "type" in connector: document.document_type = connector["type"] @@ -861,6 +867,9 @@ async def _update_document_from_connector( # Expand existing metadata with connector metadata merged = {**document.document_metadata, **connector["metadata"]} document.document_metadata = merged + # Set connector_id if provided for de-indexing support + if "connector_id" in connector: + document.connector_id = connector["connector_id"] await session.commit() diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index 3a9867fd6..ff85d962e 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -295,6 +295,8 @@ async def add_received_markdown_file_document( unique_identifier_hash=primary_hash, blocknote_document=blocknote_json, updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector.get("connector_id") if connector else None, ) session.add(document) diff --git a/surfsense_backend/app/tasks/document_processors/youtube_processor.py b/surfsense_backend/app/tasks/document_processors/youtube_processor.py index da1a8f538..7251fb22f 100644 --- a/surfsense_backend/app/tasks/document_processors/youtube_processor.py +++ b/surfsense_backend/app/tasks/document_processors/youtube_processor.py @@ -357,6 +357,7 @@ async def add_youtube_video_document( unique_identifier_hash=unique_identifier_hash, blocknote_document=blocknote_json, updated_at=get_current_timestamp(), + created_by_id=user_id, ) session.add(document) diff --git a/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx b/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx index 9f73d1a82..9ef49c0d8 100644 --- a/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx +++ b/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx @@ -237,7 +237,7 @@ export function InboxSidebar({ const currentDataSource = activeTab === "mentions" ? mentions : status; const { loading, loadingMore = false, hasMore = false, loadMore } = currentDataSource; - // Status tab includes: connector indexing, document processing, page limit exceeded + // Status tab includes: connector indexing, document processing, page limit exceeded, connector deletion // Filter to only show status notification types const statusItems = useMemo( () => @@ -245,7 +245,8 @@ export function InboxSidebar({ (item) => item.type === "connector_indexing" || item.type === "document_processing" || - item.type === "page_limit_exceeded" + item.type === "page_limit_exceeded" || + item.type === "connector_deletion" ), [status.items] ); diff --git a/surfsense_web/contracts/types/inbox.types.ts b/surfsense_web/contracts/types/inbox.types.ts index 3570f2850..8e4b9ae86 100644 --- a/surfsense_web/contracts/types/inbox.types.ts +++ b/surfsense_web/contracts/types/inbox.types.ts @@ -7,6 +7,7 @@ import { documentTypeEnum } from "./document.types"; */ export const inboxItemTypeEnum = z.enum([ "connector_indexing", + "connector_deletion", "document_processing", "new_mention", "page_limit_exceeded", @@ -60,6 +61,17 @@ export const connectorIndexingMetadata = baseInboxItemMetadata.extend({ file_names: z.array(z.string()).optional(), }); +/** + * Connector deletion metadata schema + */ +export const connectorDeletionMetadata = baseInboxItemMetadata.extend({ + connector_id: z.number(), + connector_name: z.string(), + connector_type: z.string(), + documents_deleted: z.number(), + error: z.string().optional(), +}); + /** * Document processing metadata schema */ @@ -110,6 +122,7 @@ export const pageLimitExceededMetadata = baseInboxItemMetadata.extend({ */ export const inboxItemMetadata = z.union([ connectorIndexingMetadata, + connectorDeletionMetadata, documentProcessingMetadata, newMentionMetadata, pageLimitExceededMetadata, @@ -140,6 +153,11 @@ export const connectorIndexingInboxItem = inboxItem.extend({ metadata: connectorIndexingMetadata, }); +export const connectorDeletionInboxItem = inboxItem.extend({ + type: z.literal("connector_deletion"), + metadata: connectorDeletionMetadata, +}); + export const documentProcessingInboxItem = inboxItem.extend({ type: z.literal("document_processing"), metadata: documentProcessingMetadata, @@ -235,6 +253,15 @@ export function isConnectorIndexingMetadata( return connectorIndexingMetadata.safeParse(metadata).success; } +/** + * Type guard for ConnectorDeletionMetadata + */ +export function isConnectorDeletionMetadata( + metadata: unknown +): metadata is ConnectorDeletionMetadata { + return connectorDeletionMetadata.safeParse(metadata).success; +} + /** * Type guard for DocumentProcessingMetadata */ @@ -268,6 +295,7 @@ export function parseInboxItemMetadata( metadata: unknown ): | ConnectorIndexingMetadata + | ConnectorDeletionMetadata | DocumentProcessingMetadata | NewMentionMetadata | PageLimitExceededMetadata @@ -277,6 +305,10 @@ export function parseInboxItemMetadata( const result = connectorIndexingMetadata.safeParse(metadata); return result.success ? result.data : null; } + case "connector_deletion": { + const result = connectorDeletionMetadata.safeParse(metadata); + return result.success ? result.data : null; + } case "document_processing": { const result = documentProcessingMetadata.safeParse(metadata); return result.success ? result.data : null; @@ -303,12 +335,14 @@ export type InboxItemStatusEnum = z.infer; export type DocumentProcessingStageEnum = z.infer; export type BaseInboxItemMetadata = z.infer; export type ConnectorIndexingMetadata = z.infer; +export type ConnectorDeletionMetadata = z.infer; export type DocumentProcessingMetadata = z.infer; export type NewMentionMetadata = z.infer; export type PageLimitExceededMetadata = z.infer; export type InboxItemMetadata = z.infer; export type InboxItem = z.infer; export type ConnectorIndexingInboxItem = z.infer; +export type ConnectorDeletionInboxItem = z.infer; export type DocumentProcessingInboxItem = z.infer; export type NewMentionInboxItem = z.infer; export type PageLimitExceededInboxItem = z.infer;